Add error-handler callback and ensure batch2 completion handler can't be called twice (#3752)

* rename field that drives everything includes

* discovered design flaw in completion

* fix intermittent

* ensure batch2 completion handler can't be called twice

* changelog

* changelog

* javadoc

* bump hapi version

* licences

* Batch2 job definitions can now optionally provide an error handler callback that will be called when a job
instance fails, errors or is cancelled.

* Changed error handler and completion handler to take a "read-only" copy of an instance instead of just certain fields.

* comment

* javadoc

* fix test

* change step execution context from taking instance id to instance

* review feedback

* fix test

* replace 20 or so jetbrains annotations

* fixed a few incorrect @NotNull annotations

Co-authored-by: Ken Stevens <ken@smilecdr.com>
This commit is contained in:
Ken Stevens 2022-07-04 23:27:58 -04:00 committed by GitHub
parent a13f2411a1
commit 3a28920ea7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
118 changed files with 773 additions and 240 deletions

View File

@ -4,7 +4,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId> <artifactId>hapi-fhir</artifactId>
<version>6.1.0-PRE6-SNAPSHOT</version> <version>6.1.0-PRE8-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath> <relativePath>../pom.xml</relativePath>
</parent> </parent>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE6-SNAPSHOT</version> <version>6.1.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE6-SNAPSHOT</version> <version>6.1.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -6,7 +6,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE6-SNAPSHOT</version> <version>6.1.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -3,14 +3,14 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-bom</artifactId> <artifactId>hapi-fhir-bom</artifactId>
<version>6.1.0-PRE6-SNAPSHOT</version> <version>6.1.0-PRE8-SNAPSHOT</version>
<packaging>pom</packaging> <packaging>pom</packaging>
<name>HAPI FHIR BOM</name> <name>HAPI FHIR BOM</name>
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE6-SNAPSHOT</version> <version>6.1.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId> <artifactId>hapi-fhir</artifactId>
<version>6.1.0-PRE6-SNAPSHOT</version> <version>6.1.0-PRE8-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath> <relativePath>../pom.xml</relativePath>
</parent> </parent>

View File

@ -4,7 +4,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE6-SNAPSHOT</version> <version>6.1.0-PRE8-SNAPSHOT</version>
<relativePath>../../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -20,9 +20,9 @@ package ca.uhn.fhir.cli;
* #L% * #L%
*/ */
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.FhirVersionEnum; import ca.uhn.fhir.context.FhirVersionEnum;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.client.api.IGenericClient; import ca.uhn.fhir.rest.client.api.IGenericClient;
import ca.uhn.fhir.rest.client.interceptor.SimpleRequestHeaderInterceptor; import ca.uhn.fhir.rest.client.interceptor.SimpleRequestHeaderInterceptor;
@ -46,11 +46,11 @@ import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.client.HttpClientBuilder;
import org.hl7.fhir.instance.model.api.IBaseBundle; import org.hl7.fhir.instance.model.api.IBaseBundle;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.util.Base64Utils; import org.springframework.util.Base64Utils;
import javax.annotation.Nullable;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.Console; import java.io.Console;
import java.io.File; import java.io.File;

View File

@ -6,7 +6,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-cli</artifactId> <artifactId>hapi-fhir-cli</artifactId>
<version>6.1.0-PRE6-SNAPSHOT</version> <version>6.1.0-PRE8-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath> <relativePath>../pom.xml</relativePath>
</parent> </parent>

View File

@ -6,7 +6,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE6-SNAPSHOT</version> <version>6.1.0-PRE8-SNAPSHOT</version>
<relativePath>../../hapi-deployable-pom</relativePath> <relativePath>../../hapi-deployable-pom</relativePath>
</parent> </parent>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId> <artifactId>hapi-fhir</artifactId>
<version>6.1.0-PRE6-SNAPSHOT</version> <version>6.1.0-PRE8-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath> <relativePath>../pom.xml</relativePath>
</parent> </parent>

View File

@ -4,7 +4,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE6-SNAPSHOT</version> <version>6.1.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -4,7 +4,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE6-SNAPSHOT</version> <version>6.1.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE6-SNAPSHOT</version> <version>6.1.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId> <artifactId>hapi-fhir</artifactId>
<version>6.1.0-PRE6-SNAPSHOT</version> <version>6.1.0-PRE8-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath> <relativePath>../pom.xml</relativePath>
</parent> </parent>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE6-SNAPSHOT</version> <version>6.1.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -0,0 +1,4 @@
---
type: fix
issue: 3752
title: "It was possible under certain race conditions for the batch2 completion handler to be called twice. This has been corrected."

View File

@ -0,0 +1,5 @@
---
type: add
issue: 3752
title: "Batch2 job definitions can now optionally provide an error handler callback that will be called when a job
instance fails, errors or is cancelled."

View File

@ -11,7 +11,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE6-SNAPSHOT</version> <version>6.1.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -4,7 +4,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE6-SNAPSHOT</version> <version>6.1.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE6-SNAPSHOT</version> <version>6.1.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE6-SNAPSHOT</version> <version>6.1.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -20,10 +20,10 @@ package ca.uhn.fhir.jpa.batch.processor;
* #L% * #L%
*/ */
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.RuntimeSearchParam; import ca.uhn.fhir.context.RuntimeSearchParam;
import ca.uhn.fhir.fhirpath.IFhirPath; import ca.uhn.fhir.fhirpath.IFhirPath;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.batch.config.BatchConstants; import ca.uhn.fhir.jpa.batch.config.BatchConstants;
import ca.uhn.fhir.jpa.batch.log.Logs; import ca.uhn.fhir.jpa.batch.log.Logs;
import ca.uhn.fhir.jpa.dao.mdm.MdmExpansionCacheSvc; import ca.uhn.fhir.jpa.dao.mdm.MdmExpansionCacheSvc;
@ -34,13 +34,12 @@ import org.apache.commons.lang3.StringUtils;
import org.hl7.fhir.instance.model.api.IBaseExtension; import org.hl7.fhir.instance.model.api.IBaseExtension;
import org.hl7.fhir.instance.model.api.IBaseReference; import org.hl7.fhir.instance.model.api.IBaseReference;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemProcessor;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.lang.NonNull;
import javax.annotation.Nonnull;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
@ -84,7 +83,7 @@ public class GoldenResourceAnnotatingProcessor implements ItemProcessor<List<IBa
} }
@Override @Override
public List<IBaseResource> process(@NonNull List<IBaseResource> theIBaseResources) throws Exception { public List<IBaseResource> process(@Nonnull List<IBaseResource> theIBaseResources) throws Exception {
if (shouldAnnotateResource()) { if (shouldAnnotateResource()) {
lazyLoadSearchParamsAndFhirPath(); lazyLoadSearchParamsAndFhirPath();
theIBaseResources.forEach(this::annotateBackwardsReferences); theIBaseResources.forEach(this::annotateBackwardsReferences);

View File

@ -171,8 +171,7 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
} }
private JobInstance toInstance(Batch2JobInstanceEntity theEntity) { private JobInstance toInstance(Batch2JobInstanceEntity theEntity) {
JobInstance retVal = new JobInstance(); JobInstance retVal = JobInstance.fromInstanceId(theEntity.getId());
retVal.setInstanceId(theEntity.getId());
retVal.setJobDefinitionId(theEntity.getDefinitionId()); retVal.setJobDefinitionId(theEntity.getDefinitionId());
retVal.setJobDefinitionVersion(theEntity.getDefinitionVersion()); retVal.setJobDefinitionVersion(theEntity.getDefinitionVersion());
retVal.setStatus(theEntity.getStatus()); retVal.setStatus(theEntity.getStatus());
@ -241,8 +240,17 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
return new PagingIterator<>((thePageIndex, theBatchSize, theConsumer) -> fetchChunks(theInstanceId, theWithData, theBatchSize, thePageIndex, theConsumer)); return new PagingIterator<>((thePageIndex, theBatchSize, theConsumer) -> fetchChunks(theInstanceId, theWithData, theBatchSize, thePageIndex, theConsumer));
} }
/**
* Update the stored instance
*
* @param theInstance The instance - Must contain an ID
* @return true if the status changed
*/
@Override @Override
public void updateInstance(JobInstance theInstance) { public boolean updateInstance(JobInstance theInstance) {
// Separate updating the status so we have atomic information about whether the status is changing
int recordsChangedByStatusUpdate = myJobInstanceRepository.updateInstanceStatus(theInstance.getInstanceId(), theInstance.getStatus());
Optional<Batch2JobInstanceEntity> instanceOpt = myJobInstanceRepository.findById(theInstance.getInstanceId()); Optional<Batch2JobInstanceEntity> instanceOpt = myJobInstanceRepository.findById(theInstance.getInstanceId());
Batch2JobInstanceEntity instance = instanceOpt.orElseThrow(() -> new IllegalArgumentException("Unknown instance ID: " + theInstance.getInstanceId())); Batch2JobInstanceEntity instance = instanceOpt.orElseThrow(() -> new IllegalArgumentException("Unknown instance ID: " + theInstance.getInstanceId()));
@ -262,6 +270,7 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
instance.setReport(theInstance.getReport()); instance.setReport(theInstance.getReport());
myJobInstanceRepository.save(instance); myJobInstanceRepository.save(instance);
return recordsChangedByStatusUpdate > 0;
} }
@Override @Override
@ -276,8 +285,9 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
} }
@Override @Override
public void markInstanceAsCompleted(String theInstanceId) { public boolean markInstanceAsCompleted(String theInstanceId) {
myJobInstanceRepository.updateInstanceStatus(theInstanceId, StatusEnum.COMPLETED); int recordsChanged = myJobInstanceRepository.updateInstanceStatus(theInstanceId, StatusEnum.COMPLETED);
return recordsChanged > 0;
} }
@Override @Override

View File

@ -112,7 +112,6 @@ import org.hl7.fhir.instance.model.api.IDomainResource;
import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType; import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.r4.model.Bundle.HTTPVerb; import org.hl7.fhir.r4.model.Bundle.HTTPVerb;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException; import org.springframework.beans.BeansException;
@ -1576,7 +1575,7 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> extends BaseStora
encodedResource.setEncoding(theEncoding); encodedResource.setEncoding(theEncoding);
} }
@NotNull @Nonnull
private Map<String, Boolean> getSearchParamPresenceMap(ResourceTable entity, ResourceIndexedSearchParams newParams) { private Map<String, Boolean> getSearchParamPresenceMap(ResourceTable entity, ResourceIndexedSearchParams newParams) {
Map<String, Boolean> retval = new HashMap<>(); Map<String, Boolean> retval = new HashMap<>();

View File

@ -35,7 +35,7 @@ public interface IBatch2JobInstanceRepository extends JpaRepository<Batch2JobIns
@Modifying @Modifying
@Query("UPDATE Batch2JobInstanceEntity e SET e.myStatus = :status WHERE e.myId = :id") @Query("UPDATE Batch2JobInstanceEntity e SET e.myStatus = :status WHERE e.myId = :id")
void updateInstanceStatus(@Param("id") String theInstanceId, @Param("status") StatusEnum theStatus); int updateInstanceStatus(@Param("id") String theInstanceId, @Param("status") StatusEnum theStatus);
@Modifying @Modifying
@Query("UPDATE Batch2JobInstanceEntity e SET e.myCancelled = :cancelled WHERE e.myId = :id") @Query("UPDATE Batch2JobInstanceEntity e SET e.myCancelled = :cancelled WHERE e.myId = :id")

View File

@ -36,8 +36,8 @@ import com.google.common.base.Strings;
import org.hl7.fhir.instance.model.api.IBase; import org.hl7.fhir.instance.model.api.IBase;
import org.hl7.fhir.instance.model.api.IBaseCoding; import org.hl7.fhir.instance.model.api.IBaseCoding;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
import org.jetbrains.annotations.NotNull;
import javax.annotation.Nonnull;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@ -69,7 +69,7 @@ public class ExtendedHSearchIndexExtractor {
myModelConfig = theModelConfig; myModelConfig = theModelConfig;
} }
@NotNull @Nonnull
public ExtendedHSearchIndexData extract(IBaseResource theResource, ResourceIndexedSearchParams theNewParams) { public ExtendedHSearchIndexData extract(IBaseResource theResource, ResourceIndexedSearchParams theNewParams) {
ExtendedHSearchIndexData retVal = new ExtendedHSearchIndexData(myContext, myModelConfig); ExtendedHSearchIndexData retVal = new ExtendedHSearchIndexData(myContext, myModelConfig);

View File

@ -52,11 +52,11 @@ import org.hl7.fhir.r5.model.SearchParameter;
import org.hl7.fhir.r5.model.StructureDefinition; import org.hl7.fhir.r5.model.StructureDefinition;
import org.hl7.fhir.r5.utils.GraphQLSchemaGenerator; import org.hl7.fhir.r5.utils.GraphQLSchemaGenerator;
import org.hl7.fhir.utilities.graphql.IGraphQLStorageServices; import org.hl7.fhir.utilities.graphql.IGraphQLStorageServices;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.io.Writer; import java.io.Writer;
import java.util.ArrayList; import java.util.ArrayList;

View File

@ -49,9 +49,7 @@ import javax.servlet.http.HttpServletRequest;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.NavigableSet;
import java.util.Set; import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.apache.commons.lang3.ObjectUtils.defaultIfNull; import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
@ -201,6 +199,9 @@ public class JpaConformanceProviderDstu3 extends org.hl7.fhir.dstu3.hapi.rest.se
ResourceSearchParams serverConfigurationActiveSearchParams = myServerConfiguration.getActiveSearchParams(theResourceName); ResourceSearchParams serverConfigurationActiveSearchParams = myServerConfiguration.getActiveSearchParams(theResourceName);
if (mySearchParamRegistry != null) { if (mySearchParamRegistry != null) {
searchParams = mySearchParamRegistry.getActiveSearchParams(theResourceName).makeCopy(); searchParams = mySearchParamRegistry.getActiveSearchParams(theResourceName).makeCopy();
if (searchParams == null) {
return ResourceSearchParams.empty(theResourceName);
}
for (String nextBuiltInSpName : serverConfigurationActiveSearchParams.getSearchParamNames()) { for (String nextBuiltInSpName : serverConfigurationActiveSearchParams.getSearchParamNames()) {
if (nextBuiltInSpName.startsWith("_") && if (nextBuiltInSpName.startsWith("_") &&
!searchParams.containsParamName(nextBuiltInSpName) && !searchParams.containsParamName(nextBuiltInSpName) &&

View File

@ -1,5 +1,25 @@
package ca.uhn.fhir.jpa.search; package ca.uhn.fhir.jpa.search;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import org.apache.lucene.analysis.core.KeywordTokenizerFactory; import org.apache.lucene.analysis.core.KeywordTokenizerFactory;
import org.apache.lucene.analysis.core.LowerCaseFilterFactory; import org.apache.lucene.analysis.core.LowerCaseFilterFactory;
import org.apache.lucene.analysis.core.StopFilterFactory; import org.apache.lucene.analysis.core.StopFilterFactory;

View File

@ -1464,7 +1464,7 @@ public class SearchBuilder implements ISearchBuilder {
private IncludesIterator myIncludesIterator; private IncludesIterator myIncludesIterator;
private ResourcePersistentId myNext; private ResourcePersistentId myNext;
private ISearchQueryExecutor myResultsIterator; private ISearchQueryExecutor myResultsIterator;
private boolean myStillNeedToFetchIncludes; private boolean myFetchIncludesForEverythingOperation;
private int mySkipCount = 0; private int mySkipCount = 0;
private int myNonSkipCount = 0; private int myNonSkipCount = 0;
private List<ISearchQueryExecutor> myQueryList = new ArrayList<>(); private List<ISearchQueryExecutor> myQueryList = new ArrayList<>();
@ -1477,7 +1477,7 @@ public class SearchBuilder implements ISearchBuilder {
// Includes are processed inline for $everything query // Includes are processed inline for $everything query
if (myParams.getEverythingMode() != null) { if (myParams.getEverythingMode() != null) {
myStillNeedToFetchIncludes = true; myFetchIncludesForEverythingOperation = true;
} }
myHavePerfTraceFoundIdHook = CompositeInterceptorBroadcaster.hasHooks(Pointcut.JPA_PERFTRACE_SEARCH_FOUND_ID, myInterceptorBroadcaster, myRequest); myHavePerfTraceFoundIdHook = CompositeInterceptorBroadcaster.hasHooks(Pointcut.JPA_PERFTRACE_SEARCH_FOUND_ID, myInterceptorBroadcaster, myRequest);
@ -1572,9 +1572,9 @@ public class SearchBuilder implements ISearchBuilder {
} }
if (myNext == null) { if (myNext == null) {
if (myStillNeedToFetchIncludes) { if (myFetchIncludesForEverythingOperation) {
myIncludesIterator = new IncludesIterator(myPidSet, myRequest); myIncludesIterator = new IncludesIterator(myPidSet, myRequest);
myStillNeedToFetchIncludes = false; myFetchIncludesForEverythingOperation = false;
} }
if (myIncludesIterator != null) { if (myIncludesIterator != null) {
while (myIncludesIterator.hasNext()) { while (myIncludesIterator.hasNext()) {

View File

@ -21,9 +21,7 @@ package ca.uhn.fhir.jpa.term.job;
*/ */
import ca.uhn.fhir.jpa.dao.data.ITermCodeSystemDao; import ca.uhn.fhir.jpa.dao.data.ITermCodeSystemDao;
import ca.uhn.fhir.jpa.dao.data.ITermCodeSystemVersionDao;
import ca.uhn.fhir.jpa.entity.TermCodeSystem; import ca.uhn.fhir.jpa.entity.TermCodeSystem;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.batch.core.StepContribution; import org.springframework.batch.core.StepContribution;
@ -33,6 +31,8 @@ import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Nonnull;
import static ca.uhn.fhir.jpa.batch.config.BatchConstants.JOB_PARAM_CODE_SYSTEM_ID; import static ca.uhn.fhir.jpa.batch.config.BatchConstants.JOB_PARAM_CODE_SYSTEM_ID;
@Component @Component
@ -43,7 +43,7 @@ public class TermCodeSystemDeleteTasklet implements Tasklet {
private ITermCodeSystemDao myTermCodeSystemDao; private ITermCodeSystemDao myTermCodeSystemDao;
@Override @Override
public RepeatStatus execute(@NotNull StepContribution contribution, ChunkContext context) throws Exception { public RepeatStatus execute(@Nonnull StepContribution contribution, ChunkContext context) throws Exception {
long codeSystemPid = (Long) context.getStepContext().getJobParameters().get(JOB_PARAM_CODE_SYSTEM_ID); long codeSystemPid = (Long) context.getStepContext().getJobParameters().get(JOB_PARAM_CODE_SYSTEM_ID);
TermCodeSystem cs = myTermCodeSystemDao.findById(codeSystemPid).orElseThrow(IllegalStateException::new); TermCodeSystem cs = myTermCodeSystemDao.findById(codeSystemPid).orElseThrow(IllegalStateException::new);

View File

@ -7,7 +7,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE6-SNAPSHOT</version> <version>6.1.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -6,7 +6,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE6-SNAPSHOT</version> <version>6.1.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -1,9 +1,8 @@
package ca.uhn.fhir.mdm.batch2.clear; package ca.uhn.fhir.mdm.batch2.clear;
import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.StepExecutionDetails; import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson; import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.jpa.entity.MdmLink; import ca.uhn.fhir.jpa.entity.MdmLink;
import ca.uhn.fhir.jpa.mdm.BaseMdmR4Test; import ca.uhn.fhir.jpa.mdm.BaseMdmR4Test;
import ca.uhn.fhir.jpa.mdm.helper.MdmHelperR4; import ca.uhn.fhir.jpa.mdm.helper.MdmHelperR4;
@ -17,12 +16,11 @@ import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException; import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException;
import org.hl7.fhir.r4.model.Patient; import org.hl7.fhir.r4.model.Patient;
import org.hl7.fhir.r4.model.Reference; import org.hl7.fhir.r4.model.Reference;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.Nonnull;
import java.util.UUID; import java.util.UUID;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
@ -37,8 +35,6 @@ class MdmClearStepTest extends BaseMdmR4Test {
@Autowired @Autowired
MdmHelperR4 myMdmHelperR4; MdmHelperR4 myMdmHelperR4;
@Mock
IJobDataSink<VoidModel> myDataSink;
private Long mySourcePid; private Long mySourcePid;
private Long myGoldenPid; private Long myGoldenPid;
private MdmLink myLink; private MdmLink myLink;
@ -106,13 +102,14 @@ class MdmClearStepTest extends BaseMdmR4Test {
myMdmClearStep.myHapiTransactionService.execute(requestDetails, transactionDetails, myMdmClearStep.buildJob(requestDetails, transactionDetails, stepExecutionDetails)); myMdmClearStep.myHapiTransactionService.execute(requestDetails, transactionDetails, myMdmClearStep.buildJob(requestDetails, transactionDetails, stepExecutionDetails));
} }
@NotNull @Nonnull
private StepExecutionDetails<MdmClearJobParameters, ResourceIdListWorkChunkJson> buildStepExecutionDetails(ResourceIdListWorkChunkJson chunk) { private StepExecutionDetails<MdmClearJobParameters, ResourceIdListWorkChunkJson> buildStepExecutionDetails(ResourceIdListWorkChunkJson chunk) {
String instanceId = UUID.randomUUID().toString(); String instanceId = UUID.randomUUID().toString();
JobInstance jobInstance = JobInstance.fromInstanceId(instanceId);
String chunkid = UUID.randomUUID().toString(); String chunkid = UUID.randomUUID().toString();
MdmClearJobParameters parms = new MdmClearJobParameters(); MdmClearJobParameters parms = new MdmClearJobParameters();
StepExecutionDetails<MdmClearJobParameters, ResourceIdListWorkChunkJson> stepExecutionDetails = new StepExecutionDetails<>(parms, chunk, instanceId, chunkid); StepExecutionDetails<MdmClearJobParameters, ResourceIdListWorkChunkJson> stepExecutionDetails = new StepExecutionDetails<>(parms, chunk, jobInstance, chunkid);
return stepExecutionDetails; return stepExecutionDetails;
} }

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE6-SNAPSHOT</version> <version>6.1.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE6-SNAPSHOT</version> <version>6.1.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -1,5 +1,25 @@
package ca.uhn.fhir.jpa.searchparam.matcher; package ca.uhn.fhir.jpa.searchparam.matcher;
/*-
* #%L
* HAPI FHIR Search Parameters
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.jpa.searchparam.MatchUrlService; import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import ca.uhn.fhir.rest.server.interceptor.auth.IAuthorizationSearchParamMatcher; import ca.uhn.fhir.rest.server.interceptor.auth.IAuthorizationSearchParamMatcher;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE6-SNAPSHOT</version> <version>6.1.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -22,7 +22,6 @@ import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import org.hl7.fhir.r4.model.IdType; import org.hl7.fhir.r4.model.IdType;
import org.hl7.fhir.r4.model.Patient; import org.hl7.fhir.r4.model.Patient;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
@ -36,6 +35,7 @@ import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException; import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.GenericMessage; import org.springframework.messaging.support.GenericMessage;
import javax.annotation.Nonnull;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.time.LocalDate; import java.time.LocalDate;
import java.util.List; import java.util.List;
@ -276,14 +276,14 @@ public class BaseSubscriptionDeliverySubscriberTest {
assertEquals(jsonMessage.getPayload().getRequestPartitionId().toJson(), RequestPartitionId.defaultPartition().toJson()); assertEquals(jsonMessage.getPayload().getRequestPartitionId().toJson(), RequestPartitionId.defaultPartition().toJson());
} }
@NotNull @Nonnull
private Patient generatePatient() { private Patient generatePatient() {
Patient patient = new Patient(); Patient patient = new Patient();
patient.setActive(true); patient.setActive(true);
return patient; return patient;
} }
@NotNull @Nonnull
private CanonicalSubscription generateSubscription() { private CanonicalSubscription generateSubscription() {
CanonicalSubscription subscription = new CanonicalSubscription(); CanonicalSubscription subscription = new CanonicalSubscription();
subscription.setIdElement(new IdType("Subscription/123")); subscription.setIdElement(new IdType("Subscription/123"));

View File

@ -6,7 +6,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE6-SNAPSHOT</version> <version>6.1.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -14,7 +14,6 @@ import ca.uhn.test.util.LogbackCaptureTestExtension;
import ch.qos.logback.classic.Level; import ch.qos.logback.classic.Level;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.instance.model.api.IIdType;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@ -23,6 +22,7 @@ import org.mockito.ArgumentMatchers;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.junit.jupiter.MockitoSettings;
import javax.annotation.Nullable;
import java.util.HashSet; import java.util.HashSet;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;

View File

@ -9,18 +9,19 @@ import ca.uhn.fhir.rest.server.interceptor.auth.IAuthorizationSearchParamMatcher
import ca.uhn.fhir.rest.server.interceptor.auth.IRuleApplier; import ca.uhn.fhir.rest.server.interceptor.auth.IRuleApplier;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.instance.model.api.IIdType;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
/** /**
* Empty implementation to base a stub. * Empty implementation to base a stub.
*/ */
public class TestRuleApplier implements IRuleApplier { public class TestRuleApplier implements IRuleApplier {
private static final Logger ourLog = LoggerFactory.getLogger(TestRuleApplier.class); private static final Logger ourLog = LoggerFactory.getLogger(TestRuleApplier.class);
@NotNull @Nonnull
@Override @Override
public Logger getTroubleshootingLog() { public Logger getTroubleshootingLog() {
return ourLog; return ourLog;

View File

@ -1,8 +1,10 @@
package ca.uhn.fhir.jpa.batch2; package ca.uhn.fhir.jpa.batch2;
import ca.uhn.fhir.batch2.api.ChunkExecutionDetails; import ca.uhn.fhir.batch2.api.ChunkExecutionDetails;
import ca.uhn.fhir.batch2.api.IJobCompletionHandler;
import ca.uhn.fhir.batch2.api.IJobCoordinator; import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.api.IJobDataSink; import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.IJobMaintenanceService;
import ca.uhn.fhir.batch2.api.IJobPersistence; import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.api.IJobStepWorker; import ca.uhn.fhir.batch2.api.IJobStepWorker;
import ca.uhn.fhir.batch2.api.IReductionStepWorker; import ca.uhn.fhir.batch2.api.IReductionStepWorker;
@ -19,9 +21,10 @@ import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
import ca.uhn.fhir.jpa.test.Batch2JobHelper; import ca.uhn.fhir.jpa.test.Batch2JobHelper;
import ca.uhn.fhir.model.api.IModelJson; import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.util.JsonUtil; import ca.uhn.fhir.util.JsonUtil;
import ca.uhn.test.concurrency.LatchTimedOutError;
import ca.uhn.test.concurrency.PointcutLatch; import ca.uhn.test.concurrency.PointcutLatch;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -31,6 +34,9 @@ import javax.annotation.Nonnull;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
@ -49,6 +55,8 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
@Autowired @Autowired
IJobCoordinator myJobCoordinator; IJobCoordinator myJobCoordinator;
@Autowired @Autowired
IJobMaintenanceService myJobMaintenanceService;
@Autowired
Batch2JobHelper myBatch2JobHelper; Batch2JobHelper myBatch2JobHelper;
@Autowired @Autowired
@ -56,12 +64,18 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
private final PointcutLatch myFirstStepLatch = new PointcutLatch("First Step"); private final PointcutLatch myFirstStepLatch = new PointcutLatch("First Step");
private final PointcutLatch myLastStepLatch = new PointcutLatch("Last Step"); private final PointcutLatch myLastStepLatch = new PointcutLatch("Last Step");
private IJobCompletionHandler<TestJobParameters> myCompletionHandler;
private RunOutcome callLatch(PointcutLatch theLatch, StepExecutionDetails<?, ?> theStep) { private static RunOutcome callLatch(PointcutLatch theLatch, StepExecutionDetails<?, ?> theStep) {
theLatch.call(theStep); theLatch.call(theStep);
return RunOutcome.SUCCESS; return RunOutcome.SUCCESS;
} }
@BeforeEach
public void before() {
myCompletionHandler = details -> {};
}
@Test @Test
public void testFirstStepNoSink() throws InterruptedException { public void testFirstStepNoSink() throws InterruptedException {
IJobStepWorker<TestJobParameters, VoidModel, FirstStepOutput> firstStep = (step, sink) -> callLatch(myFirstStepLatch, step); IJobStepWorker<TestJobParameters, VoidModel, FirstStepOutput> firstStep = (step, sink) -> callLatch(myFirstStepLatch, step);
@ -109,6 +123,73 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
myLastStepLatch.awaitExpected(); myLastStepLatch.awaitExpected();
} }
@Test
public void testFastTrack_Maintenance_do_not_both_call_CompletionHandler() throws InterruptedException {
IJobStepWorker<TestJobParameters, VoidModel, FirstStepOutput> firstStep = (step, sink) -> {
sink.accept(new FirstStepOutput());
callLatch(myFirstStepLatch, step);
return RunOutcome.SUCCESS;
};
IJobStepWorker<TestJobParameters, FirstStepOutput, VoidModel> lastStep = (step, sink) -> callLatch(myLastStepLatch, step);
String jobId = "test-job-2a";
String completionHandlerLatchName = "Completion Handler";
PointcutLatch calledLatch = new PointcutLatch(completionHandlerLatchName);
CountDownLatch waitLatch = new CountDownLatch(2);
myCompletionHandler = details -> {
try {
calledLatch.call(details);
waitLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
};
JobDefinition<? extends IModelJson> definition = buildGatedJobDefinition(jobId, firstStep, lastStep);
myJobDefinitionRegistry.addJobDefinition(definition);
JobInstanceStartRequest request = buildRequest(jobId);
myFirstStepLatch.setExpectedCount(1);
myLastStepLatch.setExpectedCount(1);
calledLatch.setExpectedCount(1);
String instanceId = myJobCoordinator.startInstance(request);
myFirstStepLatch.awaitExpected();
calledLatch.awaitExpected();
myBatch2JobHelper.assertNoGatedStep(instanceId);
// Start a maintenance run in the background
ExecutorService executor = Executors.newSingleThreadExecutor();
// Now queue up the maintenance call
calledLatch.setExpectedCount(1);
executor.submit(() -> myJobMaintenanceService.runMaintenancePass());
// We should have only called the completion handler once
try {
// This test will pause for 5 seconds here. This should be more than enough time on most servers to hit the
// spot where the maintenance services calls the completion handler
calledLatch.awaitExpectedWithTimeout(5);
fail();
} catch (LatchTimedOutError e) {
assertEquals("HAPI-1483: " + completionHandlerLatchName + " PointcutLatch timed out waiting 5 seconds for latch to countdown from 1 to 0. Is 1.", e.getMessage());
}
// Now release the latches
waitLatch.countDown();
waitLatch.countDown(); // This shouldn't be necessary, but just in case
// Since there was only one chunk, the job should proceed without requiring a maintenance pass
myBatch2JobHelper.awaitSingleChunkJobCompletion(instanceId);
myLastStepLatch.awaitExpected();
}
@Test @Test
public void testJobDefinitionWithReductionStepIT() throws InterruptedException { public void testJobDefinitionWithReductionStepIT() throws InterruptedException {
// setup // setup
@ -142,10 +223,10 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
return ChunkOutcome.SUCCESS(); return ChunkOutcome.SUCCESS();
} }
@NotNull @Nonnull
@Override @Override
public RunOutcome run(@NotNull StepExecutionDetails<TestJobParameters, SecondStepOutput> theStepExecutionDetails, public RunOutcome run(@Nonnull StepExecutionDetails<TestJobParameters, SecondStepOutput> theStepExecutionDetails,
@NotNull IJobDataSink<ReductionStepOutput> theDataSink) throws JobExecutionFailedException { @Nonnull IJobDataSink<ReductionStepOutput> theDataSink) throws JobExecutionFailedException {
theDataSink.accept(new ReductionStepOutput(myOutput)); theDataSink.accept(new ReductionStepOutput(myOutput));
callLatch(myLastStepLatch, theStepExecutionDetails); callLatch(myLastStepLatch, theStepExecutionDetails);
return RunOutcome.SUCCESS; return RunOutcome.SUCCESS;
@ -318,6 +399,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
"Test last step", "Test last step",
theLastStep theLastStep
) )
.completionHandler(myCompletionHandler)
.build(); .build();
} }

View File

@ -11,7 +11,6 @@ import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.Appender; import ch.qos.logback.core.Appender;
import org.hl7.fhir.r4.model.Patient; import org.hl7.fhir.r4.model.Patient;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
@ -29,6 +28,7 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.support.TransactionSynchronizationManager; import org.springframework.transaction.support.TransactionSynchronizationManager;
import javax.annotation.Nullable;
import javax.persistence.EntityExistsException; import javax.persistence.EntityExistsException;
import javax.persistence.EntityManager; import javax.persistence.EntityManager;
import javax.persistence.NoResultException; import javax.persistence.NoResultException;
@ -43,8 +43,6 @@ import java.util.List;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;

View File

@ -4,7 +4,6 @@ import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.batch.config.BatchConstants; import ca.uhn.fhir.jpa.batch.config.BatchConstants;
import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportJobSchedulingHelper; import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportJobSchedulingHelper;
import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportSvc; import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.bulk.export.model.BulkExportJobStatusEnum;
import ca.uhn.fhir.jpa.bulk.export.model.BulkExportResponseJson; import ca.uhn.fhir.jpa.bulk.export.model.BulkExportResponseJson;
import ca.uhn.fhir.jpa.bulk.export.provider.BulkDataExportProvider; import ca.uhn.fhir.jpa.bulk.export.provider.BulkDataExportProvider;
import ca.uhn.fhir.jpa.entity.PartitionEntity; import ca.uhn.fhir.jpa.entity.PartitionEntity;
@ -31,7 +30,6 @@ import org.hl7.fhir.r4.model.Condition;
import org.hl7.fhir.r4.model.IdType; import org.hl7.fhir.r4.model.IdType;
import org.hl7.fhir.r4.model.Organization; import org.hl7.fhir.r4.model.Organization;
import org.hl7.fhir.r4.model.Patient; import org.hl7.fhir.r4.model.Patient;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -42,9 +40,7 @@ import java.util.Date;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNotNull;

View File

@ -20,6 +20,7 @@ import org.hl7.fhir.r4.model.Organization;
import org.hl7.fhir.r4.model.Patient; import org.hl7.fhir.r4.model.Patient;
import org.hl7.fhir.r4.model.Reference; import org.hl7.fhir.r4.model.Reference;
import org.hl7.fhir.r4.model.StringType; import org.hl7.fhir.r4.model.StringType;
import org.hl7.fhir.r4.model.Task;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@ -45,6 +46,7 @@ public class PatientEverythingR4Test extends BaseResourceProviderR4Test {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(PatientEverythingR4Test.class); private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(PatientEverythingR4Test.class);
private String orgId; private String orgId;
private String patId; private String patId;
private String taskId;
private String encId1; private String encId1;
private String encId2; private String encId2;
private ArrayList<String> myObsIds; private ArrayList<String> myObsIds;
@ -101,6 +103,11 @@ public class PatientEverythingR4Test extends BaseResourceProviderR4Test {
enc2.getServiceProvider().setReference(orgId); enc2.getServiceProvider().setReference(orgId);
encId2 = myClient.create().resource(enc2).execute().getId().toUnqualifiedVersionless().getValue(); encId2 = myClient.create().resource(enc2).execute().getId().toUnqualifiedVersionless().getValue();
Task task = new Task();
task.setStatus(Task.TaskStatus.COMPLETED);
task.getOwner().setReference(patId);
taskId = myClient.create().resource(task).execute().getId().toUnqualifiedVersionless().getValue();
Encounter wrongEnc1 = new Encounter(); Encounter wrongEnc1 = new Encounter();
wrongEnc1.setStatus(EncounterStatus.ARRIVED); wrongEnc1.setStatus(EncounterStatus.ARRIVED);
wrongEnc1.getSubject().setReference(myWrongPatId); wrongEnc1.getSubject().setReference(myWrongPatId);
@ -177,6 +184,7 @@ public class PatientEverythingR4Test extends BaseResourceProviderR4Test {
assertThat(actual, hasItem(encId1)); assertThat(actual, hasItem(encId1));
assertThat(actual, hasItem(encId2)); assertThat(actual, hasItem(encId2));
assertThat(actual, hasItem(orgId)); assertThat(actual, hasItem(orgId));
assertThat(actual, hasItem(taskId));
assertThat(actual, hasItems(myObsIds.toArray(new String[0]))); assertThat(actual, hasItems(myObsIds.toArray(new String[0])));
assertThat(actual, not(hasItem(myWrongPatId))); assertThat(actual, not(hasItem(myWrongPatId)));
assertThat(actual, not(hasItem(myWrongEnc1))); assertThat(actual, not(hasItem(myWrongEnc1)));

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId> <artifactId>hapi-fhir</artifactId>
<version>6.1.0-PRE6-SNAPSHOT</version> <version>6.1.0-PRE8-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath> <relativePath>../pom.xml</relativePath>
</parent> </parent>

View File

@ -7,7 +7,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE6-SNAPSHOT</version> <version>6.1.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE6-SNAPSHOT</version> <version>6.1.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE6-SNAPSHOT</version> <version>6.1.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -1,5 +1,25 @@
package ca.uhn.fhir.rest.server.interceptor.auth; package ca.uhn.fhir.rest.server.interceptor.auth;
/*-
* #%L
* HAPI FHIR - Server Framework
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.rest.api.RestOperationTypeEnum; import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
import ca.uhn.fhir.rest.api.server.RequestDetails; import ca.uhn.fhir.rest.api.server.RequestDetails;

View File

@ -1,5 +1,25 @@
package ca.uhn.fhir.rest.server.interceptor.auth; package ca.uhn.fhir.rest.server.interceptor.auth;
/*-
* #%L
* HAPI FHIR - Server Framework
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;

View File

@ -1,5 +1,25 @@
package ca.uhn.fhir.rest.server.interceptor.consent; package ca.uhn.fhir.rest.server.interceptor.consent;
/*-
* #%L
* HAPI FHIR - Server Framework
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.interceptor.api.Pointcut; import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.rest.api.server.RequestDetails; import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.server.interceptor.auth.AuthorizationInterceptor; import ca.uhn.fhir.rest.server.interceptor.auth.AuthorizationInterceptor;

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE6-SNAPSHOT</version> <version>6.1.0-PRE8-SNAPSHOT</version>
<relativePath>../../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-spring-boot-samples</artifactId> <artifactId>hapi-fhir-spring-boot-samples</artifactId>
<version>6.1.0-PRE6-SNAPSHOT</version> <version>6.1.0-PRE8-SNAPSHOT</version>
</parent> </parent>
<artifactId>hapi-fhir-spring-boot-sample-client-apache</artifactId> <artifactId>hapi-fhir-spring-boot-sample-client-apache</artifactId>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-spring-boot-samples</artifactId> <artifactId>hapi-fhir-spring-boot-samples</artifactId>
<version>6.1.0-PRE6-SNAPSHOT</version> <version>6.1.0-PRE8-SNAPSHOT</version>
</parent> </parent>
<artifactId>hapi-fhir-spring-boot-sample-client-okhttp</artifactId> <artifactId>hapi-fhir-spring-boot-sample-client-okhttp</artifactId>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-spring-boot-samples</artifactId> <artifactId>hapi-fhir-spring-boot-samples</artifactId>
<version>6.1.0-PRE6-SNAPSHOT</version> <version>6.1.0-PRE8-SNAPSHOT</version>
</parent> </parent>
<artifactId>hapi-fhir-spring-boot-sample-server-jersey</artifactId> <artifactId>hapi-fhir-spring-boot-sample-server-jersey</artifactId>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-spring-boot</artifactId> <artifactId>hapi-fhir-spring-boot</artifactId>
<version>6.1.0-PRE6-SNAPSHOT</version> <version>6.1.0-PRE8-SNAPSHOT</version>
</parent> </parent>
<artifactId>hapi-fhir-spring-boot-samples</artifactId> <artifactId>hapi-fhir-spring-boot-samples</artifactId>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE6-SNAPSHOT</version> <version>6.1.0-PRE8-SNAPSHOT</version>
<relativePath>../../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId> <artifactId>hapi-fhir</artifactId>
<version>6.1.0-PRE6-SNAPSHOT</version> <version>6.1.0-PRE8-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath> <relativePath>../pom.xml</relativePath>
</parent> </parent>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE6-SNAPSHOT</version> <version>6.1.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE6-SNAPSHOT</version> <version>6.1.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>

View File

@ -67,7 +67,7 @@ public class ReindexStep implements IJobStepWorker<ReindexJobParameters, Resourc
ResourceIdListWorkChunkJson data = theStepExecutionDetails.getData(); ResourceIdListWorkChunkJson data = theStepExecutionDetails.getData();
return doReindex(data, theDataSink, theStepExecutionDetails.getInstanceId(), theStepExecutionDetails.getChunkId()); return doReindex(data, theDataSink, theStepExecutionDetails.getInstance().getInstanceId(), theStepExecutionDetails.getChunkId());
} }
@Nonnull @Nonnull

View File

@ -4,6 +4,7 @@ import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.JobExecutionFailedException; import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.StepExecutionDetails; import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel; import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.test.utilities.server.HttpServletExtension; import ca.uhn.fhir.test.utilities.server.HttpServletExtension;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
@ -12,7 +13,6 @@ import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.util.Base64Utils; import org.springframework.util.Base64Utils;
import java.io.StringReader;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
@ -27,7 +27,9 @@ import static org.mockito.Mockito.verify;
public class FetchFilesStepTest { public class FetchFilesStepTest {
public static final String INSTANCE_ID = "instance-id"; public static final String INSTANCE_ID = "instance-id";
public static final JobInstance ourTestInstance = JobInstance.fromInstanceId(INSTANCE_ID);
public static final String CHUNK_ID = "chunk-id"; public static final String CHUNK_ID = "chunk-id";
private final BulkImportFileServlet myBulkImportFileServlet = new BulkImportFileServlet(); private final BulkImportFileServlet myBulkImportFileServlet = new BulkImportFileServlet();
@RegisterExtension @RegisterExtension
private final HttpServletExtension myHttpServletExtension = new HttpServletExtension() private final HttpServletExtension myHttpServletExtension = new HttpServletExtension()
@ -47,7 +49,7 @@ public class FetchFilesStepTest {
BulkImportJobParameters parameters = new BulkImportJobParameters() BulkImportJobParameters parameters = new BulkImportJobParameters()
.addNdJsonUrl(myHttpServletExtension.getBaseUrl() + "/download?index=" + index) .addNdJsonUrl(myHttpServletExtension.getBaseUrl() + "/download?index=" + index)
.setHttpBasicCredentials("admin:password"); .setHttpBasicCredentials("admin:password");
StepExecutionDetails<BulkImportJobParameters, VoidModel> details = new StepExecutionDetails<>(parameters, null, INSTANCE_ID, CHUNK_ID); StepExecutionDetails<BulkImportJobParameters, VoidModel> details = new StepExecutionDetails<>(parameters, null, ourTestInstance, CHUNK_ID);
// Test // Test
@ -76,7 +78,7 @@ public class FetchFilesStepTest {
BulkImportJobParameters parameters = new BulkImportJobParameters() BulkImportJobParameters parameters = new BulkImportJobParameters()
.addNdJsonUrl(myHttpServletExtension.getBaseUrl() + "/download?index=" + index) .addNdJsonUrl(myHttpServletExtension.getBaseUrl() + "/download?index=" + index)
.setMaxBatchResourceCount(3); .setMaxBatchResourceCount(3);
StepExecutionDetails<BulkImportJobParameters, VoidModel> details = new StepExecutionDetails<>(parameters, null, INSTANCE_ID, CHUNK_ID); StepExecutionDetails<BulkImportJobParameters, VoidModel> details = new StepExecutionDetails<>(parameters, null, ourTestInstance, CHUNK_ID);
// Test // Test
@ -98,7 +100,7 @@ public class FetchFilesStepTest {
BulkImportJobParameters parameters = new BulkImportJobParameters() BulkImportJobParameters parameters = new BulkImportJobParameters()
.addNdJsonUrl(myHttpServletExtension.getBaseUrl() + "/download?index=" + index) .addNdJsonUrl(myHttpServletExtension.getBaseUrl() + "/download?index=" + index)
.setHttpBasicCredentials("admin"); .setHttpBasicCredentials("admin");
StepExecutionDetails<BulkImportJobParameters, VoidModel> details = new StepExecutionDetails<>(parameters, null, INSTANCE_ID, CHUNK_ID); StepExecutionDetails<BulkImportJobParameters, VoidModel> details = new StepExecutionDetails<>(parameters, null, ourTestInstance, CHUNK_ID);
// Test & Verify // Test & Verify

View File

@ -3,6 +3,7 @@ package ca.uhn.fhir.batch2.jobs.reindex;
import ca.uhn.fhir.batch2.api.IJobDataSink; import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.StepExecutionDetails; import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson; import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.jpa.api.pid.HomogeneousResourcePidList; import ca.uhn.fhir.jpa.api.pid.HomogeneousResourcePidList;
import ca.uhn.fhir.jpa.api.pid.IResourcePidList; import ca.uhn.fhir.jpa.api.pid.IResourcePidList;
import ca.uhn.fhir.jpa.api.svc.IResourceReindexSvc; import ca.uhn.fhir.jpa.api.svc.IResourceReindexSvc;
@ -60,8 +61,9 @@ public class LoadIdsStepTest {
ReindexChunkRangeJson range = new ReindexChunkRangeJson(); ReindexChunkRangeJson range = new ReindexChunkRangeJson();
range.setStart(DATE_1).setEnd(DATE_END); range.setStart(DATE_1).setEnd(DATE_END);
String instanceId = "instance-id"; String instanceId = "instance-id";
JobInstance jobInstance = JobInstance.fromInstanceId(instanceId);
String chunkId = "chunk-id"; String chunkId = "chunk-id";
StepExecutionDetails<ReindexJobParameters, ReindexChunkRangeJson> details = new StepExecutionDetails<>(parameters, range, instanceId, chunkId); StepExecutionDetails<ReindexJobParameters, ReindexChunkRangeJson> details = new StepExecutionDetails<>(parameters, range, jobInstance, chunkId);
// First Execution // First Execution

View File

@ -6,7 +6,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE6-SNAPSHOT</version> <version>6.1.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -0,0 +1,44 @@
package ca.uhn.fhir.batch2.api;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.StatusEnum;
import java.util.Date;
public interface IJobInstance {
String getCurrentGatedStepId();
int getErrorCount();
String getEstimatedTimeRemaining();
boolean isWorkChunksPurged();
StatusEnum getStatus();
int getJobDefinitionVersion();
String getInstanceId();
Date getStartTime();
Date getEndTime();
Integer getCombinedRecordsProcessed();
Double getCombinedRecordsProcessedPerSecond();
Date getCreateTime();
Integer getTotalElapsedMillis();
double getProgress();
String getErrorMessage();
JobDefinition<?> getJobDefinition();
boolean isCancelled();
String getReport();
}

View File

@ -152,11 +152,13 @@ public interface IJobPersistence {
Iterator<WorkChunk> fetchAllWorkChunksIterator(String theInstanceId, boolean theWithData); Iterator<WorkChunk> fetchAllWorkChunksIterator(String theInstanceId, boolean theWithData);
/** /**
* Update the stored instance * Update the stored instance. If the status is changing, use {@link ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater}
* instead to ensure state-change callbacks are invoked properly.
* *
* @param theInstance The instance - Must contain an ID * @param theInstance The instance - Must contain an ID
* @return true if the status changed
*/ */
void updateInstance(JobInstance theInstance); boolean updateInstance(JobInstance theInstance);
/** /**
* Deletes the instance and all associated work chunks * Deletes the instance and all associated work chunks
@ -176,8 +178,9 @@ public interface IJobPersistence {
* Marks an instance as being complete * Marks an instance as being complete
* *
* @param theInstanceId The instance ID * @param theInstanceId The instance ID
* @return true if the instance status changed
*/ */
void markInstanceAsCompleted(String theInstanceId); boolean markInstanceAsCompleted(String theInstanceId);
/** /**
* Marks an instance as cancelled * Marks an instance as cancelled

View File

@ -20,6 +20,7 @@ package ca.uhn.fhir.batch2.api;
* #L% * #L%
*/ */
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.model.api.IModelJson; import ca.uhn.fhir.model.api.IModelJson;
import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.Validate;
@ -28,12 +29,14 @@ import javax.annotation.Nonnull;
public class JobCompletionDetails<PT extends IModelJson> { public class JobCompletionDetails<PT extends IModelJson> {
private final PT myParameters; private final PT myParameters;
private final String myInstanceId; private final IJobInstance myInstance;
public JobCompletionDetails(@Nonnull PT theParameters, @Nonnull String theInstanceId) { public JobCompletionDetails(@Nonnull PT theParameters, @Nonnull JobInstance theInstance) {
Validate.notNull(theParameters); Validate.notNull(theParameters);
myParameters = theParameters; myParameters = theParameters;
myInstanceId = theInstanceId; // Make a copy of the instance. Even though the interface is read-only, we don't want to risk users of this API
// casting the instance and changing values inside it.
myInstance = new JobInstance(theInstance);
} }
/** /**
@ -49,8 +52,8 @@ public class JobCompletionDetails<PT extends IModelJson> {
* Returns the job instance ID being executed * Returns the job instance ID being executed
*/ */
@Nonnull @Nonnull
public String getInstanceId() { public IJobInstance getInstance() {
return myInstanceId; return myInstance;
} }
} }

View File

@ -1,5 +1,25 @@
package ca.uhn.fhir.batch2.api; package ca.uhn.fhir.batch2.api;
/*-
* #%L
* HAPI FHIR JPA Server - Batch2 Task Processor
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.model.api.IModelJson; import ca.uhn.fhir.model.api.IModelJson;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;

View File

@ -1,9 +1,8 @@
package ca.uhn.fhir.batch2.api; package ca.uhn.fhir.batch2.api;
import ca.uhn.fhir.batch2.model.ListResult; import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.i18n.Msg; import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.model.api.IModelJson; import ca.uhn.fhir.model.api.IModelJson;
import org.apache.commons.lang3.Validate;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -39,8 +38,9 @@ public class ReductionStepExecutionDetails<PT extends IModelJson, IT extends IMo
public ReductionStepExecutionDetails(@Nonnull PT theParameters, public ReductionStepExecutionDetails(@Nonnull PT theParameters,
@Nullable IT theData, @Nullable IT theData,
@Nonnull String theInstanceId) { @Nonnull JobInstance theInstance) {
super(theParameters, theData, theInstanceId, "VOID"); // TODO KHS shouldn't the chunkId be null?
super(theParameters, theData, theInstance, "VOID");
} }
@Override @Override

View File

@ -20,6 +20,7 @@ package ca.uhn.fhir.batch2.api;
* #L% * #L%
*/ */
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.model.api.IModelJson; import ca.uhn.fhir.model.api.IModelJson;
import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.Validate;
@ -30,17 +31,18 @@ public class StepExecutionDetails<PT extends IModelJson, IT extends IModelJson>
private final PT myParameters; private final PT myParameters;
private final IT myData; private final IT myData;
private final String myInstanceId; private final IJobInstance myInstance;
private final String myChunkId; private final String myChunkId;
public StepExecutionDetails(@Nonnull PT theParameters, public StepExecutionDetails(@Nonnull PT theParameters,
@Nullable IT theData, @Nullable IT theData,
@Nonnull String theInstanceId, @Nonnull JobInstance theInstance,
@Nonnull String theChunkId) { @Nonnull String theChunkId) {
Validate.notNull(theParameters); Validate.notNull(theParameters);
myParameters = theParameters; myParameters = theParameters;
myData = theData; myData = theData;
myInstanceId = theInstanceId; // Make a copy so the step worker can't change the one passed in
myInstance = new JobInstance(theInstance);
myChunkId = theChunkId; myChunkId = theChunkId;
} }
@ -71,8 +73,8 @@ public class StepExecutionDetails<PT extends IModelJson, IT extends IModelJson>
* Returns the job instance ID being executed * Returns the job instance ID being executed
*/ */
@Nonnull @Nonnull
public String getInstanceId() { public IJobInstance getInstance() {
return myInstanceId; return myInstance;
} }
/** /**

View File

@ -113,7 +113,7 @@ public class JobDefinitionRegistry {
} }
public void setJobDefinition(JobInstance theInstance) { public void setJobDefinition(JobInstance theInstance) {
JobDefinition<?> jobDefinition = getJobDefinitionOrThrowException(theInstance.getJobDefinitionId(), theInstance.getJobDefinitionVersion()); JobDefinition<?> jobDefinition = getJobDefinitionOrThrowException(theInstance);
theInstance.setJobDefinition(jobDefinition); theInstance.setJobDefinition(jobDefinition);
} }
@ -130,4 +130,12 @@ public class JobDefinitionRegistry {
public boolean isEmpty() { public boolean isEmpty() {
return myJobs.isEmpty(); return myJobs.isEmpty();
} }
public Optional<JobDefinition<?>> getJobDefinition(JobInstance theJobInstance) {
return getJobDefinition(theJobInstance.getJobDefinitionId(), theJobInstance.getJobDefinitionVersion());
}
public JobDefinition<?> getJobDefinitionOrThrowException(JobInstance theJobInstance) {
return getJobDefinitionOrThrowException(theJobInstance.getJobDefinitionId(), theJobInstance.getJobDefinitionVersion());
}
} }

View File

@ -77,7 +77,7 @@ class JobQuerySvc {
private JobInstance massageInstanceForUserAccess(JobInstance theInstance) { private JobInstance massageInstanceForUserAccess(JobInstance theInstance) {
JobInstance retVal = new JobInstance(theInstance); JobInstance retVal = new JobInstance(theInstance);
JobDefinition<?> definition = myJobDefinitionRegistry.getJobDefinitionOrThrowException(theInstance.getJobDefinitionId(), theInstance.getJobDefinitionVersion()); JobDefinition<?> definition = myJobDefinitionRegistry.getJobDefinitionOrThrowException(theInstance);
// Serializing the parameters strips any write-only params // Serializing the parameters strips any write-only params
IModelJson parameters = retVal.getParameters(definition.getParametersType()); IModelJson parameters = retVal.getParameters(definition.getParametersType());

View File

@ -100,7 +100,7 @@ public class StepExecutionSvc {
} else { } else {
// all other kinds of steps // all other kinds of steps
Validate.notNull(theWorkChunk); Validate.notNull(theWorkChunk);
stepExecutionDetails = getExecutionDetailsForNonReductionStep(theWorkChunk, instanceId, inputType, parameters); stepExecutionDetails = getExecutionDetailsForNonReductionStep(theWorkChunk, theInstance, inputType, parameters);
// execute the step // execute the step
boolean success = executeStep(stepExecutionDetails, boolean success = executeStep(stepExecutionDetails,
@ -143,7 +143,7 @@ public class StepExecutionSvc {
*/ */
private <PT extends IModelJson, IT extends IModelJson> StepExecutionDetails<PT, IT> getExecutionDetailsForNonReductionStep( private <PT extends IModelJson, IT extends IModelJson> StepExecutionDetails<PT, IT> getExecutionDetailsForNonReductionStep(
WorkChunk theWorkChunk, WorkChunk theWorkChunk,
String theInstanceId, JobInstance theInstance,
Class<IT> theInputType, Class<IT> theInputType,
PT theParameters PT theParameters
) { ) {
@ -158,7 +158,7 @@ public class StepExecutionSvc {
stepExecutionDetails = new StepExecutionDetails<>( stepExecutionDetails = new StepExecutionDetails<>(
theParameters, theParameters,
inputData, inputData,
theInstanceId, theInstance,
chunkId chunkId
); );
return stepExecutionDetails; return stepExecutionDetails;
@ -267,7 +267,7 @@ public class StepExecutionSvc {
ReductionStepExecutionDetails<PT, IT, OT> executionDetails = new ReductionStepExecutionDetails<>( ReductionStepExecutionDetails<PT, IT, OT> executionDetails = new ReductionStepExecutionDetails<>(
theParameters, theParameters,
null, null,
theInstance.getInstanceId() theInstance
); );
return executeStep(executionDetails, return executeStep(executionDetails,

View File

@ -124,8 +124,8 @@ public class SynchronizedJobPersistenceWrapper implements IJobPersistence {
@Override @Override
public synchronized void updateInstance(JobInstance theInstance) { public synchronized boolean updateInstance(JobInstance theInstance) {
myWrap.updateInstance(theInstance); return myWrap.updateInstance(theInstance);
} }
@Override @Override
@ -139,8 +139,8 @@ public class SynchronizedJobPersistenceWrapper implements IJobPersistence {
} }
@Override @Override
public synchronized void markInstanceAsCompleted(String theInstanceId) { public synchronized boolean markInstanceAsCompleted(String theInstanceId) {
myWrap.markInstanceAsCompleted(theInstanceId); return myWrap.markInstanceAsCompleted(theInstanceId);
} }
@Override @Override

View File

@ -29,6 +29,7 @@ import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.JobWorkNotification; import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.StatusEnum; import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.progress.JobInstanceProgressCalculator; import ca.uhn.fhir.batch2.progress.JobInstanceProgressCalculator;
import ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater;
import org.apache.commons.lang3.time.DateUtils; import org.apache.commons.lang3.time.DateUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -47,6 +48,7 @@ public class JobInstanceProcessor {
private final JobChunkProgressAccumulator myProgressAccumulator; private final JobChunkProgressAccumulator myProgressAccumulator;
private final JobInstanceProgressCalculator myJobInstanceProgressCalculator; private final JobInstanceProgressCalculator myJobInstanceProgressCalculator;
private final StepExecutionSvc myJobExecutorSvc; private final StepExecutionSvc myJobExecutorSvc;
private final JobInstanceStatusUpdater myJobInstanceStatusUpdater;
JobInstanceProcessor(IJobPersistence theJobPersistence, JobInstanceProcessor(IJobPersistence theJobPersistence,
BatchJobSender theBatchJobSender, BatchJobSender theBatchJobSender,
@ -60,6 +62,7 @@ public class JobInstanceProcessor {
myJobExecutorSvc = theExecutorSvc; myJobExecutorSvc = theExecutorSvc;
myProgressAccumulator = theProgressAccumulator; myProgressAccumulator = theProgressAccumulator;
myJobInstanceProgressCalculator = new JobInstanceProgressCalculator(theJobPersistence, theInstance, theProgressAccumulator); myJobInstanceProgressCalculator = new JobInstanceProgressCalculator(theJobPersistence, theInstance, theProgressAccumulator);
myJobInstanceStatusUpdater = new JobInstanceStatusUpdater(theJobPersistence);
} }
public void process() { public void process() {
@ -72,7 +75,7 @@ public class JobInstanceProcessor {
if (myInstance.isPendingCancellation()) { if (myInstance.isPendingCancellation()) {
myInstance.setErrorMessage(buildCancelledMessage()); myInstance.setErrorMessage(buildCancelledMessage());
myInstance.setStatus(StatusEnum.CANCELLED); myInstance.setStatus(StatusEnum.CANCELLED);
myJobPersistence.updateInstance(myInstance); myJobInstanceStatusUpdater.updateInstance(myInstance);
} }
} }
@ -174,16 +177,8 @@ public class JobInstanceProcessor {
null); null);
if (!result.isSuccessful()) { if (!result.isSuccessful()) {
myInstance.setStatus(StatusEnum.FAILED); myInstance.setStatus(StatusEnum.FAILED);
myJobPersistence.updateInstance(myInstance); myJobInstanceStatusUpdater.updateInstance(myInstance);
} }
} }
public static boolean updateInstanceStatus(JobInstance myInstance, StatusEnum newStatus) {
if (myInstance.getStatus() != newStatus) {
ourLog.info("Marking job instance {} of type {} as {}", myInstance.getInstanceId(), myInstance.getJobDefinitionId(), newStatus);
myInstance.setStatus(newStatus);
return true;
}
return false;
}
} }

View File

@ -34,7 +34,6 @@ import javax.annotation.Nonnull;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -51,11 +50,12 @@ public class JobDefinition<PT extends IModelJson> {
private final boolean myGatedExecution; private final boolean myGatedExecution;
private final List<String> myStepIds; private final List<String> myStepIds;
private final IJobCompletionHandler<PT> myCompletionHandler; private final IJobCompletionHandler<PT> myCompletionHandler;
private final IJobCompletionHandler<PT> myErrorHandler;
/** /**
* Constructor * Constructor
*/ */
private JobDefinition(String theJobDefinitionId, int theJobDefinitionVersion, String theJobDescription, Class<PT> theParametersType, List<JobDefinitionStep<PT, ?, ?>> theSteps, IJobParametersValidator<PT> theParametersValidator, boolean theGatedExecution, IJobCompletionHandler<PT> theCompletionHandler) { private JobDefinition(String theJobDefinitionId, int theJobDefinitionVersion, String theJobDescription, Class<PT> theParametersType, List<JobDefinitionStep<PT, ?, ?>> theSteps, IJobParametersValidator<PT> theParametersValidator, boolean theGatedExecution, IJobCompletionHandler<PT> theCompletionHandler, IJobCompletionHandler<PT> theErrorHandler) {
Validate.isTrue(theJobDefinitionId.length() <= ID_MAX_LENGTH, "Maximum ID length is %d", ID_MAX_LENGTH); Validate.isTrue(theJobDefinitionId.length() <= ID_MAX_LENGTH, "Maximum ID length is %d", ID_MAX_LENGTH);
Validate.notBlank(theJobDefinitionId, "No job definition ID supplied"); Validate.notBlank(theJobDefinitionId, "No job definition ID supplied");
Validate.notBlank(theJobDescription, "No job description supplied"); Validate.notBlank(theJobDescription, "No job description supplied");
@ -70,6 +70,7 @@ public class JobDefinition<PT extends IModelJson> {
myParametersValidator = theParametersValidator; myParametersValidator = theParametersValidator;
myGatedExecution = theGatedExecution; myGatedExecution = theGatedExecution;
myCompletionHandler = theCompletionHandler; myCompletionHandler = theCompletionHandler;
myErrorHandler = theErrorHandler;
} }
@Nullable @Nullable
@ -77,6 +78,11 @@ public class JobDefinition<PT extends IModelJson> {
return myCompletionHandler; return myCompletionHandler;
} }
@Nullable
public IJobCompletionHandler<PT> getErrorHandler() {
return myErrorHandler;
}
@Nullable @Nullable
public IJobParametersValidator<PT> getParametersValidator() { public IJobParametersValidator<PT> getParametersValidator() {
return myParametersValidator; return myParametersValidator;
@ -135,10 +141,6 @@ public class JobDefinition<PT extends IModelJson> {
return retVal; return retVal;
} }
public boolean isLastStep(String theStepId) {
return getStepIndex(theStepId) == (myStepIds.size() - 1);
}
public static class Builder<PT extends IModelJson, NIT extends IModelJson> { public static class Builder<PT extends IModelJson, NIT extends IModelJson> {
private final List<JobDefinitionStep<PT, ?, ?>> mySteps; private final List<JobDefinitionStep<PT, ?, ?>> mySteps;
@ -151,12 +153,13 @@ public class JobDefinition<PT extends IModelJson> {
private IJobParametersValidator<PT> myParametersValidator; private IJobParametersValidator<PT> myParametersValidator;
private boolean myGatedExecution; private boolean myGatedExecution;
private IJobCompletionHandler<PT> myCompletionHandler; private IJobCompletionHandler<PT> myCompletionHandler;
private IJobCompletionHandler<PT> myErrorHandler;
Builder() { Builder() {
mySteps = new ArrayList<>(); mySteps = new ArrayList<>();
} }
Builder(List<JobDefinitionStep<PT, ?, ?>> theSteps, String theJobDefinitionId, int theJobDefinitionVersion, String theJobDescription, Class<PT> theJobParametersType, Class<NIT> theNextInputType, @Nullable IJobParametersValidator<PT> theParametersValidator, boolean theGatedExecution, IJobCompletionHandler<PT> theCompletionHandler) { Builder(List<JobDefinitionStep<PT, ?, ?>> theSteps, String theJobDefinitionId, int theJobDefinitionVersion, String theJobDescription, Class<PT> theJobParametersType, Class<NIT> theNextInputType, @Nullable IJobParametersValidator<PT> theParametersValidator, boolean theGatedExecution, IJobCompletionHandler<PT> theCompletionHandler, IJobCompletionHandler<PT> theErrorHandler) {
mySteps = theSteps; mySteps = theSteps;
myJobDefinitionId = theJobDefinitionId; myJobDefinitionId = theJobDefinitionId;
myJobDefinitionVersion = theJobDefinitionVersion; myJobDefinitionVersion = theJobDefinitionVersion;
@ -166,6 +169,7 @@ public class JobDefinition<PT extends IModelJson> {
myParametersValidator = theParametersValidator; myParametersValidator = theParametersValidator;
myGatedExecution = theGatedExecution; myGatedExecution = theGatedExecution;
myCompletionHandler = theCompletionHandler; myCompletionHandler = theCompletionHandler;
myErrorHandler = theErrorHandler;
} }
/** /**
@ -196,7 +200,7 @@ public class JobDefinition<PT extends IModelJson> {
*/ */
public <OT extends IModelJson> Builder<PT, OT> addFirstStep(String theStepId, String theStepDescription, Class<OT> theOutputType, IJobStepWorker<PT, VoidModel, OT> theStepWorker) { public <OT extends IModelJson> Builder<PT, OT> addFirstStep(String theStepId, String theStepDescription, Class<OT> theOutputType, IJobStepWorker<PT, VoidModel, OT> theStepWorker) {
mySteps.add(new JobDefinitionStep<>(theStepId, theStepDescription, theStepWorker, VoidModel.class, theOutputType)); mySteps.add(new JobDefinitionStep<>(theStepId, theStepDescription, theStepWorker, VoidModel.class, theOutputType));
return new Builder<>(mySteps, myJobDefinitionId, myJobDefinitionVersion, myJobDescription, myJobParametersType, theOutputType, myParametersValidator, myGatedExecution, myCompletionHandler); return new Builder<>(mySteps, myJobDefinitionId, myJobDefinitionVersion, myJobDescription, myJobParametersType, theOutputType, myParametersValidator, myGatedExecution, myCompletionHandler, myErrorHandler);
} }
/** /**
@ -210,7 +214,7 @@ public class JobDefinition<PT extends IModelJson> {
*/ */
public <OT extends IModelJson> Builder<PT, OT> addIntermediateStep(String theStepId, String theStepDescription, Class<OT> theOutputType, IJobStepWorker<PT, NIT, OT> theStepWorker) { public <OT extends IModelJson> Builder<PT, OT> addIntermediateStep(String theStepId, String theStepDescription, Class<OT> theOutputType, IJobStepWorker<PT, NIT, OT> theStepWorker) {
mySteps.add(new JobDefinitionStep<>(theStepId, theStepDescription, theStepWorker, myNextInputType, theOutputType)); mySteps.add(new JobDefinitionStep<>(theStepId, theStepDescription, theStepWorker, myNextInputType, theOutputType));
return new Builder<>(mySteps, myJobDefinitionId, myJobDefinitionVersion, myJobDescription, myJobParametersType, theOutputType, myParametersValidator, myGatedExecution, myCompletionHandler); return new Builder<>(mySteps, myJobDefinitionId, myJobDefinitionVersion, myJobDescription, myJobParametersType, theOutputType, myParametersValidator, myGatedExecution, myCompletionHandler, myErrorHandler);
} }
/** /**
@ -224,17 +228,17 @@ public class JobDefinition<PT extends IModelJson> {
*/ */
public Builder<PT, VoidModel> addLastStep(String theStepId, String theStepDescription, IJobStepWorker<PT, NIT, VoidModel> theStepWorker) { public Builder<PT, VoidModel> addLastStep(String theStepId, String theStepDescription, IJobStepWorker<PT, NIT, VoidModel> theStepWorker) {
mySteps.add(new JobDefinitionStep<>(theStepId, theStepDescription, theStepWorker, myNextInputType, VoidModel.class)); mySteps.add(new JobDefinitionStep<>(theStepId, theStepDescription, theStepWorker, myNextInputType, VoidModel.class));
return new Builder<>(mySteps, myJobDefinitionId, myJobDefinitionVersion, myJobDescription, myJobParametersType, VoidModel.class, myParametersValidator, myGatedExecution, myCompletionHandler); return new Builder<>(mySteps, myJobDefinitionId, myJobDefinitionVersion, myJobDescription, myJobParametersType, VoidModel.class, myParametersValidator, myGatedExecution, myCompletionHandler, myErrorHandler);
} }
public <OT extends IModelJson> Builder<PT, OT> addFinalReducerStep(String theStepId, String theStepDescription, Class<OT> theOutputType, IReductionStepWorker<PT, NIT, OT> theStepWorker) { public <OT extends IModelJson> Builder<PT, OT> addFinalReducerStep(String theStepId, String theStepDescription, Class<OT> theOutputType, IReductionStepWorker<PT, NIT, OT> theStepWorker) {
mySteps.add(new JobDefinitionReductionStep<PT, NIT, OT>(theStepId, theStepDescription, theStepWorker, myNextInputType, theOutputType)); mySteps.add(new JobDefinitionReductionStep<PT, NIT, OT>(theStepId, theStepDescription, theStepWorker, myNextInputType, theOutputType));
return new Builder<PT, OT>(mySteps, myJobDefinitionId, myJobDefinitionVersion, myJobDescription, myJobParametersType, theOutputType, myParametersValidator, myGatedExecution, myCompletionHandler); return new Builder<PT, OT>(mySteps, myJobDefinitionId, myJobDefinitionVersion, myJobDescription, myJobParametersType, theOutputType, myParametersValidator, myGatedExecution, myCompletionHandler, myErrorHandler);
} }
public JobDefinition<PT> build() { public JobDefinition<PT> build() {
Validate.notNull(myJobParametersType, "No job parameters type was supplied"); Validate.notNull(myJobParametersType, "No job parameters type was supplied");
return new JobDefinition<>(myJobDefinitionId, myJobDefinitionVersion, myJobDescription, myJobParametersType, Collections.unmodifiableList(mySteps), myParametersValidator, myGatedExecution, myCompletionHandler); return new JobDefinition<>(myJobDefinitionId, myJobDefinitionVersion, myJobDescription, myJobParametersType, Collections.unmodifiableList(mySteps), myParametersValidator, myGatedExecution, myCompletionHandler, myErrorHandler);
} }
public Builder<PT, NIT> setJobDescription(String theJobDescription) { public Builder<PT, NIT> setJobDescription(String theJobDescription) {
@ -326,6 +330,14 @@ public class JobDefinition<PT extends IModelJson> {
return this; return this;
} }
/**
* Supplies an optional callback that will be invoked if the job fails
*/
public Builder<PT, NIT> errorHandler(IJobCompletionHandler<PT> theErrorHandler) {
Validate.isTrue(myErrorHandler == null, "Can not supply multiple error handlers");
myErrorHandler = theErrorHandler;
return this;
}
} }

View File

@ -20,6 +20,7 @@ package ca.uhn.fhir.batch2.model;
* #L% * #L%
*/ */
import ca.uhn.fhir.batch2.api.IJobInstance;
import ca.uhn.fhir.jpa.util.JsonDateDeserializer; import ca.uhn.fhir.jpa.util.JsonDateDeserializer;
import ca.uhn.fhir.jpa.util.JsonDateSerializer; import ca.uhn.fhir.jpa.util.JsonDateSerializer;
import ca.uhn.fhir.model.api.IModelJson; import ca.uhn.fhir.model.api.IModelJson;
@ -34,7 +35,7 @@ import java.util.Date;
import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.isBlank;
public class JobInstance extends JobInstanceStartRequest implements IModelJson { public class JobInstance extends JobInstanceStartRequest implements IModelJson, IJobInstance {
@JsonProperty(value = "jobDefinitionVersion") @JsonProperty(value = "jobDefinitionVersion")
private int myJobDefinitionVersion; private int myJobDefinitionVersion;
@ -130,6 +131,13 @@ public class JobInstance extends JobInstanceStartRequest implements IModelJson {
return instance; return instance;
} }
public static JobInstance fromInstanceId(String theInstanceId) {
JobInstance instance = new JobInstance();
instance.setInstanceId(theInstanceId);
return instance;
}
@Override
public String getCurrentGatedStepId() { public String getCurrentGatedStepId() {
return myCurrentGatedStepId; return myCurrentGatedStepId;
} }
@ -138,6 +146,7 @@ public class JobInstance extends JobInstanceStartRequest implements IModelJson {
myCurrentGatedStepId = theCurrentGatedStepId; myCurrentGatedStepId = theCurrentGatedStepId;
} }
@Override
public int getErrorCount() { public int getErrorCount() {
return myErrorCount; return myErrorCount;
} }
@ -147,6 +156,7 @@ public class JobInstance extends JobInstanceStartRequest implements IModelJson {
return this; return this;
} }
@Override
public String getEstimatedTimeRemaining() { public String getEstimatedTimeRemaining() {
return myEstimatedTimeRemaining; return myEstimatedTimeRemaining;
} }
@ -155,6 +165,7 @@ public class JobInstance extends JobInstanceStartRequest implements IModelJson {
myEstimatedTimeRemaining = theEstimatedTimeRemaining; myEstimatedTimeRemaining = theEstimatedTimeRemaining;
} }
@Override
public boolean isWorkChunksPurged() { public boolean isWorkChunksPurged() {
return myWorkChunksPurged; return myWorkChunksPurged;
} }
@ -163,6 +174,7 @@ public class JobInstance extends JobInstanceStartRequest implements IModelJson {
myWorkChunksPurged = theWorkChunksPurged; myWorkChunksPurged = theWorkChunksPurged;
} }
@Override
public StatusEnum getStatus() { public StatusEnum getStatus() {
return myStatus; return myStatus;
} }
@ -172,6 +184,7 @@ public class JobInstance extends JobInstanceStartRequest implements IModelJson {
return this; return this;
} }
@Override
public int getJobDefinitionVersion() { public int getJobDefinitionVersion() {
return myJobDefinitionVersion; return myJobDefinitionVersion;
} }
@ -180,6 +193,7 @@ public class JobInstance extends JobInstanceStartRequest implements IModelJson {
myJobDefinitionVersion = theJobDefinitionVersion; myJobDefinitionVersion = theJobDefinitionVersion;
} }
@Override
public String getInstanceId() { public String getInstanceId() {
return myInstanceId; return myInstanceId;
} }
@ -188,6 +202,7 @@ public class JobInstance extends JobInstanceStartRequest implements IModelJson {
myInstanceId = theInstanceId; myInstanceId = theInstanceId;
} }
@Override
public Date getStartTime() { public Date getStartTime() {
return myStartTime; return myStartTime;
} }
@ -197,6 +212,7 @@ public class JobInstance extends JobInstanceStartRequest implements IModelJson {
return this; return this;
} }
@Override
public Date getEndTime() { public Date getEndTime() {
return myEndTime; return myEndTime;
} }
@ -206,6 +222,7 @@ public class JobInstance extends JobInstanceStartRequest implements IModelJson {
return this; return this;
} }
@Override
public Integer getCombinedRecordsProcessed() { public Integer getCombinedRecordsProcessed() {
return myCombinedRecordsProcessed; return myCombinedRecordsProcessed;
} }
@ -214,6 +231,7 @@ public class JobInstance extends JobInstanceStartRequest implements IModelJson {
myCombinedRecordsProcessed = theCombinedRecordsProcessed; myCombinedRecordsProcessed = theCombinedRecordsProcessed;
} }
@Override
public Double getCombinedRecordsProcessedPerSecond() { public Double getCombinedRecordsProcessedPerSecond() {
return myCombinedRecordsProcessedPerSecond; return myCombinedRecordsProcessedPerSecond;
} }
@ -222,6 +240,7 @@ public class JobInstance extends JobInstanceStartRequest implements IModelJson {
myCombinedRecordsProcessedPerSecond = theCombinedRecordsProcessedPerSecond; myCombinedRecordsProcessedPerSecond = theCombinedRecordsProcessedPerSecond;
} }
@Override
public Date getCreateTime() { public Date getCreateTime() {
return myCreateTime; return myCreateTime;
} }
@ -231,6 +250,7 @@ public class JobInstance extends JobInstanceStartRequest implements IModelJson {
return this; return this;
} }
@Override
public Integer getTotalElapsedMillis() { public Integer getTotalElapsedMillis() {
return myTotalElapsedMillis; return myTotalElapsedMillis;
} }
@ -239,6 +259,7 @@ public class JobInstance extends JobInstanceStartRequest implements IModelJson {
myTotalElapsedMillis = theTotalElapsedMillis; myTotalElapsedMillis = theTotalElapsedMillis;
} }
@Override
public double getProgress() { public double getProgress() {
return myProgress; return myProgress;
} }
@ -247,6 +268,7 @@ public class JobInstance extends JobInstanceStartRequest implements IModelJson {
myProgress = theProgress; myProgress = theProgress;
} }
@Override
public String getErrorMessage() { public String getErrorMessage() {
return myErrorMessage; return myErrorMessage;
} }
@ -263,10 +285,12 @@ public class JobInstance extends JobInstanceStartRequest implements IModelJson {
setJobDefinitionVersion(theJobDefinition.getJobDefinitionVersion()); setJobDefinitionVersion(theJobDefinition.getJobDefinitionVersion());
} }
@Override
public JobDefinition<?> getJobDefinition() { public JobDefinition<?> getJobDefinition() {
return myJobDefinition; return myJobDefinition;
} }
@Override
public boolean isCancelled() { public boolean isCancelled() {
return myCancelled; return myCancelled;
} }
@ -275,6 +299,7 @@ public class JobInstance extends JobInstanceStartRequest implements IModelJson {
myCancelled = theCancelled; myCancelled = theCancelled;
} }
@Override
public String getReport() { public String getReport() {
return myReport; return myReport;
} }

View File

@ -20,13 +20,9 @@ package ca.uhn.fhir.batch2.progress;
* #L% * #L%
*/ */
import ca.uhn.fhir.batch2.api.IJobCompletionHandler;
import ca.uhn.fhir.batch2.api.JobCompletionDetails;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum; import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk; import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.util.StopWatch; import ca.uhn.fhir.util.StopWatch;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -34,8 +30,6 @@ import org.slf4j.LoggerFactory;
import java.util.Date; import java.util.Date;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static ca.uhn.fhir.batch2.maintenance.JobInstanceProcessor.updateInstanceStatus;
class InstanceProgress { class InstanceProgress {
private static final Logger ourLog = LoggerFactory.getLogger(InstanceProgress.class); private static final Logger ourLog = LoggerFactory.getLogger(InstanceProgress.class);
@ -111,15 +105,11 @@ class InstanceProgress {
theInstance.setErrorCount(myErrorCountForAllStatuses); theInstance.setErrorCount(myErrorCountForAllStatuses);
theInstance.setCombinedRecordsProcessed(myRecordsProcessed); theInstance.setCombinedRecordsProcessed(myRecordsProcessed);
boolean changedStatus = updateStatus(theInstance); updateStatus(theInstance);
setEndTime(theInstance); setEndTime(theInstance);
theInstance.setErrorMessage(myErrormessage); theInstance.setErrorMessage(myErrormessage);
if (changedStatus || theInstance.getStatus() == StatusEnum.IN_PROGRESS) {
ourLog.info("Job {} of type {} has status {} - {} records processed ({}/sec) - ETA: {}", theInstance.getInstanceId(), theInstance.getJobDefinitionId(), theInstance.getStatus(), theInstance.getCombinedRecordsProcessed(), theInstance.getCombinedRecordsProcessedPerSecond(), theInstance.getEstimatedTimeRemaining());
}
} }
private void setEndTime(JobInstance theInstance) { private void setEndTime(JobInstance theInstance) {
@ -132,21 +122,16 @@ class InstanceProgress {
} }
} }
private boolean updateStatus(JobInstance theInstance) { private void updateStatus(JobInstance theInstance) {
boolean changedStatus = false;
if (myCompleteChunkCount > 1 || myErroredChunkCount > 1) { if (myCompleteChunkCount > 1 || myErroredChunkCount > 1) {
double percentComplete = (double) (myCompleteChunkCount) / (double) (myIncompleteChunkCount + myCompleteChunkCount + myFailedChunkCount + myErroredChunkCount); double percentComplete = (double) (myCompleteChunkCount) / (double) (myIncompleteChunkCount + myCompleteChunkCount + myFailedChunkCount + myErroredChunkCount);
theInstance.setProgress(percentComplete); theInstance.setProgress(percentComplete);
if (jobSuccessfullyCompleted()) { if (jobSuccessfullyCompleted()) {
boolean completed = updateInstanceStatus(theInstance, StatusEnum.COMPLETED); theInstance.setStatus(StatusEnum.COMPLETED);
if (completed) {
invokeJobCompletionHandler(theInstance);
}
changedStatus = completed;
} else if (myErroredChunkCount > 0) { } else if (myErroredChunkCount > 0) {
changedStatus = updateInstanceStatus(theInstance, StatusEnum.ERRORED); theInstance.setStatus(StatusEnum.ERRORED);
} }
if (myEarliestStartTime != null && myLatestEndTime != null) { if (myEarliestStartTime != null && myLatestEndTime != null) {
@ -160,24 +145,12 @@ class InstanceProgress {
} }
} }
} }
return changedStatus;
} }
private boolean jobSuccessfullyCompleted() { private boolean jobSuccessfullyCompleted() {
return myIncompleteChunkCount == 0 && myErroredChunkCount == 0 && myFailedChunkCount == 0; return myIncompleteChunkCount == 0 && myErroredChunkCount == 0 && myFailedChunkCount == 0;
} }
private <PT extends IModelJson> void invokeJobCompletionHandler(JobInstance myInstance) {
JobDefinition<PT> definition = (JobDefinition<PT>) myInstance.getJobDefinition();
IJobCompletionHandler<PT> completionHandler = definition.getCompletionHandler();
if (completionHandler != null) {
String instanceId = myInstance.getInstanceId();
PT jobParameters = myInstance.getParameters(definition.getParametersType());
JobCompletionDetails<PT> completionDetails = new JobCompletionDetails<>(jobParameters, instanceId);
completionHandler.jobComplete(completionDetails);
}
}
public boolean failed() { public boolean failed() {
return myFailedChunkCount > 0; return myFailedChunkCount > 0;
} }

View File

@ -25,21 +25,23 @@ import ca.uhn.fhir.batch2.maintenance.JobChunkProgressAccumulator;
import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum; import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk; import ca.uhn.fhir.batch2.model.WorkChunk;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import static ca.uhn.fhir.batch2.maintenance.JobInstanceProcessor.updateInstanceStatus;
public class JobInstanceProgressCalculator { public class JobInstanceProgressCalculator {
private static final Logger ourLog = LoggerFactory.getLogger(JobInstanceProgressCalculator.class);
private final IJobPersistence myJobPersistence; private final IJobPersistence myJobPersistence;
private final JobInstance myInstance; private final JobInstance myInstance;
private final JobChunkProgressAccumulator myProgressAccumulator; private final JobChunkProgressAccumulator myProgressAccumulator;
private final JobInstanceStatusUpdater myJobInstanceStatusUpdater;
public JobInstanceProgressCalculator(IJobPersistence theJobPersistence, JobInstance theInstance, JobChunkProgressAccumulator theProgressAccumulator) { public JobInstanceProgressCalculator(IJobPersistence theJobPersistence, JobInstance theInstance, JobChunkProgressAccumulator theProgressAccumulator) {
myJobPersistence = theJobPersistence; myJobPersistence = theJobPersistence;
myInstance = theInstance; myInstance = theInstance;
myProgressAccumulator = theProgressAccumulator; myProgressAccumulator = theProgressAccumulator;
myJobInstanceStatusUpdater = new JobInstanceStatusUpdater(theJobPersistence);
} }
public void calculateAndStoreInstanceProgress() { public void calculateAndStoreInstanceProgress() {
@ -56,13 +58,17 @@ public class JobInstanceProgressCalculator {
instanceProgress.updateInstance(myInstance); instanceProgress.updateInstance(myInstance);
if (instanceProgress.failed()) { if (instanceProgress.failed()) {
updateInstanceStatus(myInstance, StatusEnum.FAILED); myJobInstanceStatusUpdater.updateInstanceStatus(myInstance, StatusEnum.FAILED);
myJobPersistence.updateInstance(myInstance);
return; return;
} }
if (instanceProgress.changed() || myInstance.getStatus() == StatusEnum.IN_PROGRESS) {
ourLog.info("Job {} of type {} has status {} - {} records processed ({}/sec) - ETA: {}", myInstance.getInstanceId(), myInstance.getJobDefinitionId(), myInstance.getStatus(), myInstance.getCombinedRecordsProcessed(), myInstance.getCombinedRecordsProcessedPerSecond(), myInstance.getEstimatedTimeRemaining());
}
if (instanceProgress.changed()) { if (instanceProgress.changed()) {
myJobPersistence.updateInstance(myInstance); myJobInstanceStatusUpdater.updateInstance(myInstance);
} }
} }
} }

View File

@ -0,0 +1,67 @@
package ca.uhn.fhir.batch2.progress;
import ca.uhn.fhir.batch2.api.IJobCompletionHandler;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.api.JobCompletionDetails;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.model.api.IModelJson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JobInstanceStatusUpdater {
private static final Logger ourLog = LoggerFactory.getLogger(JobInstanceStatusUpdater.class);
private final IJobPersistence myJobPersistence;
public JobInstanceStatusUpdater(IJobPersistence theJobPersistence) {
myJobPersistence = theJobPersistence;
}
public boolean updateInstanceStatus(JobInstance theJobInstance, StatusEnum theNewStatus) {
theJobInstance.setStatus(theNewStatus);
return updateInstance(theJobInstance);
}
public boolean updateInstance(JobInstance theJobInstance) {
boolean statusChanged = myJobPersistence.updateInstance(theJobInstance);
// This code can be called by both the maintenance service and the fast track work step executor.
// We only want to call the completion handler if the status was changed to COMPLETED in this thread. We use the
// record changed count from of a sql update change status to rely on the database to tell us which thread
// the status change happened in.
if (statusChanged) {
ourLog.info("Marking job instance {} of type {} as {}", theJobInstance.getInstanceId(), theJobInstance.getJobDefinitionId(), theJobInstance.getStatus());
handleStatusChange(theJobInstance);
}
return statusChanged;
}
private <PT extends IModelJson> void handleStatusChange(JobInstance theJobInstance) {
JobDefinition<PT> definition = (JobDefinition<PT>) theJobInstance.getJobDefinition();
switch (theJobInstance.getStatus()) {
case COMPLETED:
invokeCompletionHandler(theJobInstance, definition, definition.getCompletionHandler());
break;
case FAILED:
case ERRORED:
case CANCELLED:
invokeCompletionHandler(theJobInstance, definition, definition.getErrorHandler());
break;
case QUEUED:
case IN_PROGRESS:
default:
// do nothing
}
}
private <PT extends IModelJson> void invokeCompletionHandler(JobInstance theJobInstance, JobDefinition<PT> theJobDefinition, IJobCompletionHandler<PT> theJobCompletionHandler) {
if (theJobCompletionHandler == null) {
return;
}
PT jobParameters = theJobInstance.getParameters(theJobDefinition.getParametersType());
JobCompletionDetails<PT> completionDetails = new JobCompletionDetails<>(jobParameters, theJobInstance);
theJobCompletionHandler.jobComplete(completionDetails);
}
}

View File

@ -49,8 +49,7 @@ public abstract class BaseBatch2Test {
} }
static JobInstance createInstance(String theJobId) { static JobInstance createInstance(String theJobId) {
JobInstance instance = new JobInstance(); JobInstance instance = JobInstance.fromInstanceId(INSTANCE_ID);
instance.setInstanceId(INSTANCE_ID);
instance.setStatus(StatusEnum.IN_PROGRESS); instance.setStatus(StatusEnum.IN_PROGRESS);
instance.setJobDefinitionId(theJobId); instance.setJobDefinitionId(theJobId);
instance.setJobDefinitionVersion(1); instance.setJobDefinitionVersion(1);

View File

@ -10,12 +10,12 @@ import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.channel.BatchJobSender; import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.model.JobDefinition; import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobDefinitionStep; import ca.uhn.fhir.batch2.model.JobDefinitionStep;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkCursor; import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.JobWorkNotification; import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.model.api.IModelJson; import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.util.JsonUtil; import ca.uhn.fhir.util.JsonUtil;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
@ -23,6 +23,7 @@ import org.mockito.Captor;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
import javax.annotation.Nonnull;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -58,9 +59,9 @@ class JobDataSinkTest {
// setup // setup
IJobStepWorker<TestJobParameters, VoidModel, Step1Output> firstStepWorker = new IJobStepWorker<>() { IJobStepWorker<TestJobParameters, VoidModel, Step1Output> firstStepWorker = new IJobStepWorker<>() {
@NotNull @Nonnull
@Override @Override
public RunOutcome run(@NotNull StepExecutionDetails<TestJobParameters, VoidModel> theStepExecutionDetails, @NotNull IJobDataSink<Step1Output> theDataSink) throws JobExecutionFailedException { public RunOutcome run(@Nonnull StepExecutionDetails<TestJobParameters, VoidModel> theStepExecutionDetails, @Nonnull IJobDataSink<Step1Output> theDataSink) throws JobExecutionFailedException {
TestJobParameters params = theStepExecutionDetails.getParameters(); TestJobParameters params = theStepExecutionDetails.getParameters();
int numPidsToGenerate = Integer.parseInt(params.getParam1()); int numPidsToGenerate = Integer.parseInt(params.getParam1());
Step1Output output = new Step1Output(); Step1Output output = new Step1Output();
@ -93,7 +94,8 @@ class JobDataSinkTest {
// execute // execute
// Let's test our first step worker by calling run on it: // Let's test our first step worker by calling run on it:
when(myJobPersistence.storeWorkChunk(myBatchWorkChunkCaptor.capture())).thenReturn(CHUNK_ID); when(myJobPersistence.storeWorkChunk(myBatchWorkChunkCaptor.capture())).thenReturn(CHUNK_ID);
StepExecutionDetails<TestJobParameters, VoidModel> details = new StepExecutionDetails<>(new TestJobParameters().setParam1("" + PID_COUNT), null, JOB_INSTANCE_ID, CHUNK_ID); JobInstance instance = JobInstance.fromInstanceId(JOB_INSTANCE_ID);
StepExecutionDetails<TestJobParameters, VoidModel> details = new StepExecutionDetails<>(new TestJobParameters().setParam1("" + PID_COUNT), null, instance, CHUNK_ID);
JobWorkCursor<TestJobParameters, VoidModel, Step1Output> cursor = new JobWorkCursor<>(job, true, firstStep, lastStep); JobWorkCursor<TestJobParameters, VoidModel, Step1Output> cursor = new JobWorkCursor<>(job, true, firstStep, lastStep);
JobDataSink<TestJobParameters, VoidModel, Step1Output> sink = new JobDataSink<>(myBatchJobSender, myJobPersistence, job, JOB_INSTANCE_ID, cursor); JobDataSink<TestJobParameters, VoidModel, Step1Output> sink = new JobDataSink<>(myBatchJobSender, myJobPersistence, job, JOB_INSTANCE_ID, cursor);

View File

@ -231,6 +231,7 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(createInstance())); when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(createInstance()));
when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), anyBoolean())) when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), anyBoolean()))
.thenReturn(chunks.iterator()); .thenReturn(chunks.iterator());
when(myJobPersistence.updateInstance(any())).thenReturn(true);
// Execute // Execute
@ -252,7 +253,7 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
verifyNoMoreInteractions(myJobPersistence); verifyNoMoreInteractions(myJobPersistence);
assertEquals(INSTANCE_ID, myJobCompletionCaptor.getValue().getInstanceId()); assertEquals(INSTANCE_ID, myJobCompletionCaptor.getValue().getInstance().getInstanceId());
assertEquals(PARAM_1_VALUE, myJobCompletionCaptor.getValue().getParameters().getParam1()); assertEquals(PARAM_1_VALUE, myJobCompletionCaptor.getValue().getParameters().getParam1());
} }

View File

@ -41,7 +41,7 @@ class JobQuerySvcTest extends BaseBatch2Test {
JobDefinition<?> definition = createJobDefinition(); JobDefinition<?> definition = createJobDefinition();
JobInstance instance = createInstance(); JobInstance instance = createInstance();
doReturn(definition).when(myJobDefinitionRegistry).getJobDefinitionOrThrowException(eq(JOB_DEFINITION_ID), eq(1)); doReturn(definition).when(myJobDefinitionRegistry).getJobDefinitionOrThrowException(instance);
when(myJobPersistence.fetchInstance(eq(INSTANCE_ID))).thenReturn(Optional.of(instance)); when(myJobPersistence.fetchInstance(eq(INSTANCE_ID))).thenReturn(Optional.of(instance));
// Execute // Execute
@ -60,8 +60,9 @@ class JobQuerySvcTest extends BaseBatch2Test {
// Setup // Setup
doReturn(createJobDefinition()).when(myJobDefinitionRegistry).getJobDefinitionOrThrowException(eq(JOB_DEFINITION_ID), eq(1)); JobInstance instance = createInstance();
when(myJobPersistence.fetchInstances(eq(100), eq(0))).thenReturn(Lists.newArrayList(createInstance())); when(myJobPersistence.fetchInstances(eq(100), eq(0))).thenReturn(Lists.newArrayList(instance));
doReturn(createJobDefinition()).when(myJobDefinitionRegistry).getJobDefinitionOrThrowException(instance);
// Execute // Execute

View File

@ -91,7 +91,7 @@ public class ReductionStepDataSinkTest {
// when // when
when(myJobPersistence.fetchInstance(eq(INSTANCE_ID))) when(myJobPersistence.fetchInstance(eq(INSTANCE_ID)))
.thenReturn(Optional.of(createInstance())); .thenReturn(Optional.of(JobInstance.fromInstanceId(INSTANCE_ID)));
// test // test
myDataSink.accept(chunkData); myDataSink.accept(chunkData);
@ -116,7 +116,7 @@ public class ReductionStepDataSinkTest {
// when // when
when(myJobPersistence.fetchInstance(eq(INSTANCE_ID))) when(myJobPersistence.fetchInstance(eq(INSTANCE_ID)))
.thenReturn(Optional.of(createInstance())); .thenReturn(Optional.of(JobInstance.fromInstanceId(INSTANCE_ID)));
// test // test
myDataSink.accept(firstData); myDataSink.accept(firstData);
@ -153,9 +153,4 @@ public class ReductionStepDataSinkTest {
} }
} }
private JobInstance createInstance() {
JobInstance instance = new JobInstance();
instance.setInstanceId(INSTANCE_ID);
return instance;
}
} }

View File

@ -546,8 +546,7 @@ public class StepExecutionSvcTest {
} }
private JobInstance getTestJobInstance() { private JobInstance getTestJobInstance() {
JobInstance instance = new JobInstance(); JobInstance instance = JobInstance.fromInstanceId(INSTANCE_ID);
instance.setInstanceId(INSTANCE_ID);
instance.setParameters(new TestJobParameters()); instance.setParameters(new TestJobParameters());
return instance; return instance;

View File

@ -0,0 +1,126 @@
package ca.uhn.fhir.batch2.progress;
import ca.uhn.fhir.batch2.api.IJobCompletionHandler;
import ca.uhn.fhir.batch2.api.IJobInstance;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.api.JobCompletionDetails;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.model.api.IModelJson;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class JobInstanceStatusUpdaterTest {
private static final String TEST_INSTANCE_ID = "test-instance-id";
private static final String TEST_NAME = "test name";
private static final String TEST_ERROR_MESSAGE = "test error message";
private static final int TEST_ERROR_COUNT = 729;
@Mock
IJobPersistence myJobPersistence;
@Mock
private JobDefinition<TestParameters> myJobDefinition;
@InjectMocks
JobInstanceStatusUpdater mySvc;
private JobInstance myInstance;
private TestParameters myTestParameters;
private AtomicReference<JobCompletionDetails> myDetails;
@BeforeEach
public void before() {
myInstance = JobInstance.fromInstanceId(TEST_INSTANCE_ID);
myInstance.setStatus(StatusEnum.IN_PROGRESS);
myInstance.setJobDefinition(myJobDefinition);
myTestParameters = new TestParameters();
myTestParameters.name = TEST_NAME;
myInstance.setParameters(myTestParameters);
myInstance.setErrorMessage(TEST_ERROR_MESSAGE);
myInstance.setErrorCount(TEST_ERROR_COUNT);
when(myJobDefinition.getParametersType()).thenReturn(TestParameters.class);
}
@Test
public void testCompletionHandler() {
AtomicReference<JobCompletionDetails> calledDetails = new AtomicReference<>();
// setup
when(myJobPersistence.updateInstance(myInstance)).thenReturn(true);
IJobCompletionHandler<TestParameters> completionHandler = details -> calledDetails.set(details);
when(myJobDefinition.getCompletionHandler()).thenReturn(completionHandler);
// execute
mySvc.updateInstanceStatus(myInstance, StatusEnum.COMPLETED);
JobCompletionDetails<TestParameters> receivedDetails = calledDetails.get();
assertEquals(TEST_INSTANCE_ID, receivedDetails.getInstance().getInstanceId());
assertEquals(TEST_NAME, receivedDetails.getParameters().name);
}
@Test
public void testErrorHandler_ERROR() {
setupErrorCallback();
// execute
mySvc.updateInstanceStatus(myInstance, StatusEnum.ERRORED);
assertErrorCallbackCalled(StatusEnum.ERRORED);
}
@Test
public void testErrorHandler_FAILED() {
setupErrorCallback();
// execute
mySvc.updateInstanceStatus(myInstance, StatusEnum.FAILED);
assertErrorCallbackCalled(StatusEnum.FAILED);
}
@Test
public void testErrorHandler_CANCELLED() {
setupErrorCallback();
// execute
mySvc.updateInstanceStatus(myInstance, StatusEnum.CANCELLED);
assertErrorCallbackCalled(StatusEnum.CANCELLED);
}
private void assertErrorCallbackCalled(StatusEnum expectedStatus) {
JobCompletionDetails<TestParameters> receivedDetails = myDetails.get();
assertEquals(TEST_NAME, receivedDetails.getParameters().name);
IJobInstance instance = receivedDetails.getInstance();
assertEquals(TEST_INSTANCE_ID, instance.getInstanceId());
assertEquals(TEST_ERROR_MESSAGE, instance.getErrorMessage());
assertEquals(TEST_ERROR_COUNT, instance.getErrorCount());
assertEquals(expectedStatus, instance.getStatus());
}
private void setupErrorCallback() {
myDetails = new AtomicReference<>();
// setup
when(myJobPersistence.updateInstance(myInstance)).thenReturn(true);
IJobCompletionHandler<TestParameters> errorHandler = details -> myDetails.set(details);
when(myJobDefinition.getErrorHandler()).thenReturn(errorHandler);
}
static class TestParameters implements IModelJson {
@JsonProperty
public String name;
}
}

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE6-SNAPSHOT</version> <version>6.1.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>

View File

@ -93,7 +93,7 @@ public class MdmClearStep implements IJobStepWorker<MdmClearJobParameters, Resou
myRequestDetails = theRequestDetails; myRequestDetails = theRequestDetails;
myTransactionDetails = theTransactionDetails; myTransactionDetails = theTransactionDetails;
myData = theStepExecutionDetails.getData(); myData = theStepExecutionDetails.getData();
myInstanceId = theStepExecutionDetails.getInstanceId(); myInstanceId = theStepExecutionDetails.getInstance().getInstanceId();
myChunkId = theStepExecutionDetails.getChunkId(); myChunkId = theStepExecutionDetails.getChunkId();
} }

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE6-SNAPSHOT</version> <version>6.1.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE6-SNAPSHOT</version> <version>6.1.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -5,7 +5,6 @@ import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer; import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver; import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
import ca.uhn.test.concurrency.PointcutLatch; import ca.uhn.test.concurrency.PointcutLatch;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -13,6 +12,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.MessageHeaders;
import javax.annotation.Nonnull;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -100,7 +100,7 @@ class LinkedBlockingChannelFactoryTest {
assertEquals(2, myFailureCount); assertEquals(2, myFailureCount);
} }
@NotNull @Nonnull
private Runnable failTwiceThenProceed() { private Runnable failTwiceThenProceed() {
AtomicInteger counter = new AtomicInteger(); AtomicInteger counter = new AtomicInteger();

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE6-SNAPSHOT</version> <version>6.1.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -4,7 +4,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE6-SNAPSHOT</version> <version>6.1.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE6-SNAPSHOT</version> <version>6.1.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE6-SNAPSHOT</version> <version>6.1.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE6-SNAPSHOT</version> <version>6.1.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

Some files were not shown because too many files have changed in this diff Show More