Improve batch2 error handling (#3482)
* Improve batch2 error handling * Build fixes * Work on reindexing * Test fixes * Account for gzipped contents * License header updates * Register VS operations on public server * Test fix * Build fix * Remove animal sniffer plugin * Compile fix
This commit is contained in:
parent
625ed936e9
commit
b833c13a3c
|
@ -10,8 +10,10 @@ HAPI FHIR - Java API for HL7 FHIR Clients and Servers
|
|||
| :---: | :---: | :---: |
|
||||
| [![Build Status][Badge-AzurePipelineMaster]][Link-AzurePipelinesMaster] | [![Build Status][Badge-AzureReleaseSnapshot]][Link-AzurePipelinesSnapshot] | [![Release Artifacts][Badge-MavenCentral]][Link-MavenCentral] |
|
||||
|
||||
## Test Coverage
|
||||
## Coverage and Quality
|
||||
|
||||
[![codecov][Badge-CodeCov]][Link-CodeCov]
|
||||
[![Language grade: Java](https://img.shields.io/lgtm/grade/java/g/hapifhir/hapi-fhir.svg?logo=lgtm&logoWidth=18)](https://lgtm.com/projects/g/hapifhir/hapi-fhir/context:java)
|
||||
|
||||
## Documentation and wiki
|
||||
|
||||
|
|
|
@ -18,27 +18,6 @@
|
|||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.codehaus.mojo</groupId>
|
||||
<artifactId>animal-sniffer-maven-plugin</artifactId>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>check-java-api</id>
|
||||
<phase>test</phase>
|
||||
<inherited>true</inherited>
|
||||
<goals>
|
||||
<goal>check</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<signature>
|
||||
<groupId>org.codehaus.mojo.signature</groupId>
|
||||
<artifactId>java18</artifactId>
|
||||
<version>1.0</version>
|
||||
</signature>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.basepom.maven</groupId>
|
||||
<artifactId>duplicate-finder-maven-plugin</artifactId>
|
||||
|
@ -196,7 +175,7 @@
|
|||
</links>
|
||||
<additionalparam>-Xdoclint:none</additionalparam>
|
||||
<additionalJOption>-Xdoclint:none</additionalJOption>
|
||||
<source>8</source>
|
||||
<source>11</source>
|
||||
</configuration>
|
||||
</reportSet>
|
||||
</reportSets>
|
||||
|
@ -220,7 +199,7 @@
|
|||
<verbose>false</verbose>
|
||||
<debug>false</debug>
|
||||
<additionalJOption>-Xdoclint:none</additionalJOption>
|
||||
<source>8</source>
|
||||
<source>11</source>
|
||||
</configuration>
|
||||
<executions>
|
||||
<execution>
|
||||
|
|
|
@ -91,4 +91,9 @@ public interface IBaseInterceptorService<POINTCUT extends IPointcut> extends IBa
|
|||
* Unregisters all interceptors that are indicated by the given callback function returning <code>true</code>
|
||||
*/
|
||||
void unregisterInterceptorsIf(Predicate<Object> theShouldUnregisterFunction);
|
||||
|
||||
/**
|
||||
* Unregisters all anonymous interceptors (i.e. all interceptors registered with <code>registerAnonymousInterceptor</code>)
|
||||
*/
|
||||
void unregisterAllAnonymousInterceptors();
|
||||
}
|
||||
|
|
|
@ -163,6 +163,13 @@ public abstract class BaseInterceptorService<POINTCUT extends IPointcut> impleme
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unregisterAllAnonymousInterceptors() {
|
||||
synchronized (myRegistryMutex) {
|
||||
unregisterInterceptorsIf(t -> true, myAnonymousInvokers);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unregisterInterceptorsIf(Predicate<Object> theShouldUnregisterFunction) {
|
||||
unregisterInterceptorsIf(theShouldUnregisterFunction, myGlobalInvokers);
|
||||
|
@ -170,7 +177,9 @@ public abstract class BaseInterceptorService<POINTCUT extends IPointcut> impleme
|
|||
}
|
||||
|
||||
private void unregisterInterceptorsIf(Predicate<Object> theShouldUnregisterFunction, ListMultimap<POINTCUT, BaseInvoker> theGlobalInvokers) {
|
||||
theGlobalInvokers.entries().removeIf(t -> theShouldUnregisterFunction.test(t.getValue().getInterceptor()));
|
||||
synchronized (myRegistryMutex) {
|
||||
theGlobalInvokers.entries().removeIf(t -> theShouldUnregisterFunction.test(t.getValue().getInterceptor()));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -476,6 +485,32 @@ public abstract class BaseInterceptorService<POINTCUT extends IPointcut> impleme
|
|||
|
||||
protected abstract Optional<HookDescriptor> scanForHook(Method nextMethod);
|
||||
|
||||
protected static <T extends Annotation> Optional<T> findAnnotation(AnnotatedElement theObject, Class<T> theHookClass) {
|
||||
T annotation;
|
||||
if (theObject instanceof Method) {
|
||||
annotation = MethodUtils.getAnnotation((Method) theObject, theHookClass, true, true);
|
||||
} else {
|
||||
annotation = theObject.getAnnotation(theHookClass);
|
||||
}
|
||||
return Optional.ofNullable(annotation);
|
||||
}
|
||||
|
||||
private static int determineOrder(Class<?> theInterceptorClass) {
|
||||
int typeOrder = Interceptor.DEFAULT_ORDER;
|
||||
Optional<Interceptor> typeOrderAnnotation = findAnnotation(theInterceptorClass, Interceptor.class);
|
||||
if (typeOrderAnnotation.isPresent()) {
|
||||
typeOrder = typeOrderAnnotation.get().order();
|
||||
}
|
||||
return typeOrder;
|
||||
}
|
||||
|
||||
private static String toErrorString(List<String> theParameterTypes) {
|
||||
return theParameterTypes
|
||||
.stream()
|
||||
.sorted()
|
||||
.collect(Collectors.joining(","));
|
||||
}
|
||||
|
||||
protected abstract static class BaseInvoker implements Comparable<BaseInvoker> {
|
||||
|
||||
private final int myOrder;
|
||||
|
@ -605,30 +640,4 @@ public abstract class BaseInterceptorService<POINTCUT extends IPointcut> impleme
|
|||
|
||||
}
|
||||
|
||||
protected static <T extends Annotation> Optional<T> findAnnotation(AnnotatedElement theObject, Class<T> theHookClass) {
|
||||
T annotation;
|
||||
if (theObject instanceof Method) {
|
||||
annotation = MethodUtils.getAnnotation((Method) theObject, theHookClass, true, true);
|
||||
} else {
|
||||
annotation = theObject.getAnnotation(theHookClass);
|
||||
}
|
||||
return Optional.ofNullable(annotation);
|
||||
}
|
||||
|
||||
private static int determineOrder(Class<?> theInterceptorClass) {
|
||||
int typeOrder = Interceptor.DEFAULT_ORDER;
|
||||
Optional<Interceptor> typeOrderAnnotation = findAnnotation(theInterceptorClass, Interceptor.class);
|
||||
if (typeOrderAnnotation.isPresent()) {
|
||||
typeOrder = typeOrderAnnotation.get().order();
|
||||
}
|
||||
return typeOrder;
|
||||
}
|
||||
|
||||
private static String toErrorString(List<String> theParameterTypes) {
|
||||
return theParameterTypes
|
||||
.stream()
|
||||
.sorted()
|
||||
.collect(Collectors.joining(","));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -20,8 +20,8 @@ package ca.uhn.fhir.cli;
|
|||
* #L%
|
||||
*/
|
||||
|
||||
import ca.uhn.fhir.batch2.jobs.imprt.BulkImportFileServlet;
|
||||
import ca.uhn.fhir.batch2.jobs.imprt.BulkDataImportProvider;
|
||||
import ca.uhn.fhir.batch2.jobs.imprt.BulkImportFileServlet;
|
||||
import ca.uhn.fhir.context.FhirContext;
|
||||
import ca.uhn.fhir.i18n.Msg;
|
||||
import ca.uhn.fhir.jpa.model.util.JpaConstants;
|
||||
|
@ -32,8 +32,12 @@ import ca.uhn.fhir.util.ParametersUtil;
|
|||
import org.apache.commons.cli.CommandLine;
|
||||
import org.apache.commons.cli.Options;
|
||||
import org.apache.commons.cli.ParseException;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.io.IOCase;
|
||||
import org.apache.commons.io.LineIterator;
|
||||
import org.apache.commons.io.file.PathUtils;
|
||||
import org.apache.commons.io.filefilter.FileFileFilter;
|
||||
import org.apache.commons.io.filefilter.IOFileFilter;
|
||||
import org.apache.commons.io.filefilter.SuffixFileFilter;
|
||||
import org.eclipse.jetty.server.Connector;
|
||||
import org.eclipse.jetty.server.Server;
|
||||
import org.eclipse.jetty.server.ServerConnector;
|
||||
|
@ -47,12 +51,20 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.io.File;
|
||||
import java.io.FileReader;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.Reader;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.FileVisitOption;
|
||||
import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.zip.GZIPInputStream;
|
||||
|
||||
public class BulkImportCommand extends BaseCommand {
|
||||
|
||||
|
@ -86,7 +98,7 @@ public class BulkImportCommand extends BaseCommand {
|
|||
addFhirVersionOption(options);
|
||||
addRequiredOption(options, null, PORT, PORT, "The port to listen on. If set to 0, an available free port will be selected.");
|
||||
addOptionalOption(options, null, SOURCE_BASE, "base url", "The URL to advertise as the base URL for accessing the files (i.e. this is the address that this command will declare that it is listening on). If not present, the server will default to \"http://localhost:[port]\" which will only work if the server is on the same host.");
|
||||
addRequiredOption(options, null, SOURCE_DIRECTORY, "directory", "The source directory. This directory will be scanned for files with an extension of .json or .ndjson and any files in this directory will be assumed to be NDJSON and uploaded. This command will read the first resource from each file to verify its resource type, and will assume that all resources in the file are of the same type.");
|
||||
addRequiredOption(options, null, SOURCE_DIRECTORY, "directory", "The source directory. This directory will be scanned for files with an extensions of .json, .ndjson, .json.gz and .ndjson.gz, and any files in this directory will be assumed to be NDJSON and uploaded. This command will read the first resource from each file to verify its resource type, and will assume that all resources in the file are of the same type.");
|
||||
addRequiredOption(options, null, TARGET_BASE, "base url", "The base URL of the target FHIR server.");
|
||||
addBasicAuthOption(options);
|
||||
return options;
|
||||
|
@ -169,7 +181,18 @@ public class BulkImportCommand extends BaseCommand {
|
|||
|
||||
myServlet = new BulkImportFileServlet();
|
||||
for (File t : files) {
|
||||
BulkImportFileServlet.IFileSupplier fileSupplier = () -> new FileReader(t);
|
||||
|
||||
BulkImportFileServlet.IFileSupplier fileSupplier = new BulkImportFileServlet.IFileSupplier() {
|
||||
@Override
|
||||
public boolean isGzip() {
|
||||
return t.getName().toLowerCase(Locale.ROOT).endsWith(".gz");
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputStream get() throws IOException {
|
||||
return new FileInputStream(t);
|
||||
}
|
||||
};
|
||||
indexes.add(myServlet.registerFile(fileSupplier));
|
||||
}
|
||||
|
||||
|
@ -195,18 +218,28 @@ public class BulkImportCommand extends BaseCommand {
|
|||
private void scanDirectoryForJsonFiles(String baseDirectory, List<String> types, List<File> files) {
|
||||
try {
|
||||
File directory = new File(baseDirectory);
|
||||
FileUtils
|
||||
.streamFiles(directory, false, "json", "ndjson", "JSON", "NDJSON")
|
||||
final String[] extensions = new String[]{".json", ".ndjson", ".json.gz", ".ndjson.gz"};
|
||||
final IOFileFilter filter = FileFileFilter.INSTANCE.and(new SuffixFileFilter(extensions, IOCase.INSENSITIVE));
|
||||
PathUtils
|
||||
.walk(directory.toPath(), filter, 1, false, FileVisitOption.FOLLOW_LINKS)
|
||||
.map(Path::toFile)
|
||||
.filter(t -> t.isFile())
|
||||
.filter(t -> t.exists())
|
||||
.forEach(t -> files.add(t));
|
||||
if (files.isEmpty()) {
|
||||
throw new CommandFailureException(Msg.code(2058) + "No .json/.ndjson files found in directory: " + directory.getAbsolutePath());
|
||||
throw new CommandFailureException(Msg.code(2058) + "No files found in directory \"" + directory.getAbsolutePath() + "\". Allowed extensions: " + Arrays.asList(extensions));
|
||||
}
|
||||
|
||||
FhirContext ctx = getFhirContext();
|
||||
for (File next : files) {
|
||||
try (Reader reader = new FileReader(next)) {
|
||||
try (InputStream nextIs = new FileInputStream(next)) {
|
||||
InputStream is;
|
||||
if (next.getName().toLowerCase(Locale.ROOT).endsWith(".gz")) {
|
||||
is = new GZIPInputStream(nextIs);
|
||||
} else {
|
||||
is = nextIs;
|
||||
}
|
||||
Reader reader = new InputStreamReader(is, StandardCharsets.UTF_8);
|
||||
LineIterator lineIterator = new LineIterator(reader);
|
||||
String firstLine = lineIterator.next();
|
||||
IBaseResource resource = ctx.newJsonParser().parseResource(firstLine);
|
||||
|
|
|
@ -25,12 +25,16 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.io.OutputStreamWriter;
|
||||
import java.io.Writer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.zip.GZIPOutputStream;
|
||||
|
||||
import static org.awaitility.Awaitility.await;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
@ -111,6 +115,40 @@ public class BulkImportCommandTest {
|
|||
assertEquals(fileContents1, fetchFile(jobParameters.getNdJsonUrls().get(1)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBulkImport_GzippedFile() throws IOException {
|
||||
|
||||
String fileContents1 = "{\"resourceType\":\"Observation\"}\n{\"resourceType\":\"Observation\"}";
|
||||
String fileContents2 = "{\"resourceType\":\"Patient\"}\n{\"resourceType\":\"Patient\"}";
|
||||
writeNdJsonFileToTempDirectory(fileContents1, "file1.json.gz");
|
||||
writeNdJsonFileToTempDirectory(fileContents2, "file2.json.gz");
|
||||
|
||||
when(myJobCoordinator.startInstance(any())).thenReturn("THE-JOB-ID");
|
||||
|
||||
// Start the command in a separate thread
|
||||
new Thread(() -> App.main(new String[]{
|
||||
BulkImportCommand.BULK_IMPORT,
|
||||
"--" + BaseCommand.FHIR_VERSION_PARAM_LONGOPT, "r4",
|
||||
"--" + BulkImportCommand.PORT, "0",
|
||||
"--" + BulkImportCommand.SOURCE_DIRECTORY, myTempDir.toAbsolutePath().toString(),
|
||||
"--" + BulkImportCommand.TARGET_BASE, myRestfulServerExtension.getBaseUrl()
|
||||
})).start();
|
||||
|
||||
ourLog.info("Waiting for initiation requests");
|
||||
await().until(() -> myRestfulServerExtension.getRequestContentTypes().size(), equalTo(2));
|
||||
ourLog.info("Initiation requests complete");
|
||||
|
||||
verify(myJobCoordinator, timeout(10000).times(1)).startInstance(myStartCaptor.capture());
|
||||
|
||||
JobInstanceStartRequest startRequest = myStartCaptor.getValue();
|
||||
BulkImportJobParameters jobParameters = startRequest.getParameters(BulkImportJobParameters.class);
|
||||
|
||||
// Reverse order because Patient should be first
|
||||
assertEquals(2, jobParameters.getNdJsonUrls().size());
|
||||
assertEquals(fileContents2, fetchFile(jobParameters.getNdJsonUrls().get(0)));
|
||||
assertEquals(fileContents1, fetchFile(jobParameters.getNdJsonUrls().get(1)));
|
||||
}
|
||||
|
||||
private String fetchFile(String url) throws IOException {
|
||||
String outcome;
|
||||
try (CloseableHttpResponse response = myHttpClientExtension.getClient().execute(new HttpGet(url))) {
|
||||
|
@ -120,9 +158,15 @@ public class BulkImportCommandTest {
|
|||
return outcome;
|
||||
}
|
||||
|
||||
private void writeNdJsonFileToTempDirectory(String fileContents1, String fileName) throws IOException {
|
||||
try (Writer w = new FileWriter(new File(myTempDir.toFile(), fileName))) {
|
||||
w.append(fileContents1);
|
||||
private void writeNdJsonFileToTempDirectory(String theContents, String theFileName) throws IOException {
|
||||
try (FileOutputStream fos = new FileOutputStream(new File(myTempDir.toFile(), theFileName), false)) {
|
||||
OutputStream os = fos;
|
||||
if (theFileName.endsWith(".gz")) {
|
||||
os = new GZIPOutputStream(os);
|
||||
}
|
||||
try (Writer w = new OutputStreamWriter(os)) {
|
||||
w.append(theContents);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -126,13 +126,6 @@
|
|||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.codehaus.mojo</groupId>
|
||||
<artifactId>animal-sniffer-maven-plugin</artifactId>
|
||||
<configuration>
|
||||
<skip>true</skip>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.basepom.maven</groupId>
|
||||
<artifactId>duplicate-finder-maven-plugin</artifactId>
|
||||
|
|
|
@ -0,0 +1,7 @@
|
|||
---
|
||||
type: perf
|
||||
issue: 3482
|
||||
title: "When updating or reindexing a resource on a JPA server where indexing search param presence is
|
||||
enabled (i.e. support for the `:missing` modifier), updates will try to reuse existing database rows
|
||||
where possible instead of deleting one row and adding another. This should cause a slight performance
|
||||
boost systems with this feature enabled."
|
|
@ -355,7 +355,7 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
|
|||
}
|
||||
}
|
||||
|
||||
// Populate the resource with it's actual final stored ID from the entity
|
||||
// Populate the resource with its actual final stored ID from the entity
|
||||
theResource.setId(entity.getIdDt());
|
||||
|
||||
// Pre-cache the resource ID
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
package ca.uhn.fhir.jpa.dao;
|
||||
|
||||
import ca.uhn.fhir.jpa.api.config.DaoConfig;
|
||||
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
|
||||
import ca.uhn.fhir.jpa.api.model.ExpungeOptions;
|
||||
import ca.uhn.fhir.jpa.api.model.ExpungeOutcome;
|
||||
|
@ -193,6 +194,11 @@ public abstract class BaseHapiFhirSystemDao<T extends IBaseBundle, MT> extends B
|
|||
preFetchIndexes(entityIds, "tags", "myTags", null);
|
||||
}
|
||||
|
||||
entityIds = loadedResourceTableEntries.stream().map(t->t.getId()).collect(Collectors.toList());
|
||||
if (myDaoConfig.getIndexMissingFields() == DaoConfig.IndexEnabledEnum.ENABLED) {
|
||||
preFetchIndexes(entityIds, "searchParamPresence", "mySearchParamPresents", null);
|
||||
}
|
||||
|
||||
new QueryChunker<ResourceTable>().chunk(loadedResourceTableEntries, SearchBuilder.getMaximumPageSize() / 2, entries -> {
|
||||
|
||||
Map<Long, ResourceTable> entities = entries
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
package ca.uhn.fhir.jpa.dao.data;
|
||||
|
||||
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
|
||||
import ca.uhn.fhir.jpa.model.entity.SearchParamPresent;
|
||||
import ca.uhn.fhir.jpa.model.entity.SearchParamPresentEntity;
|
||||
import org.springframework.data.jpa.repository.JpaRepository;
|
||||
import org.springframework.data.jpa.repository.Modifying;
|
||||
import org.springframework.data.jpa.repository.Query;
|
||||
|
@ -29,13 +29,13 @@ import java.util.List;
|
|||
* #L%
|
||||
*/
|
||||
|
||||
public interface ISearchParamPresentDao extends JpaRepository<SearchParamPresent, Long>, IHapiFhirJpaRepository {
|
||||
public interface ISearchParamPresentDao extends JpaRepository<SearchParamPresentEntity, Long>, IHapiFhirJpaRepository {
|
||||
|
||||
@Query("SELECT s FROM SearchParamPresent s WHERE s.myResource = :res")
|
||||
List<SearchParamPresent> findAllForResource(@Param("res") ResourceTable theResource);
|
||||
@Query("SELECT s FROM SearchParamPresentEntity s WHERE s.myResource = :res")
|
||||
List<SearchParamPresentEntity> findAllForResource(@Param("res") ResourceTable theResource);
|
||||
|
||||
@Modifying
|
||||
@Query("delete from SearchParamPresent t WHERE t.myResourcePid = :resid")
|
||||
@Query("delete from SearchParamPresentEntity t WHERE t.myResourcePid = :resid")
|
||||
void deleteByResourceId(@Param("resid") Long theResourcePid);
|
||||
|
||||
}
|
||||
|
|
|
@ -65,7 +65,7 @@ import ca.uhn.fhir.jpa.model.entity.ResourceIndexedSearchParamUri;
|
|||
import ca.uhn.fhir.jpa.model.entity.ResourceLink;
|
||||
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
|
||||
import ca.uhn.fhir.jpa.model.entity.ResourceTag;
|
||||
import ca.uhn.fhir.jpa.model.entity.SearchParamPresent;
|
||||
import ca.uhn.fhir.jpa.model.entity.SearchParamPresentEntity;
|
||||
import ca.uhn.fhir.jpa.model.entity.TagDefinition;
|
||||
import ca.uhn.fhir.jpa.util.MemoryCacheService;
|
||||
import ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster;
|
||||
|
@ -133,7 +133,7 @@ public class ExpungeEverythingService {
|
|||
counter.addAndGet(expungeEverythingByTypeWithoutPurging(NpmPackageVersionResourceEntity.class));
|
||||
counter.addAndGet(expungeEverythingByTypeWithoutPurging(NpmPackageVersionEntity.class));
|
||||
counter.addAndGet(expungeEverythingByTypeWithoutPurging(NpmPackageEntity.class));
|
||||
counter.addAndGet(expungeEverythingByTypeWithoutPurging(SearchParamPresent.class));
|
||||
counter.addAndGet(expungeEverythingByTypeWithoutPurging(SearchParamPresentEntity.class));
|
||||
counter.addAndGet(expungeEverythingByTypeWithoutPurging(BulkImportJobFileEntity.class));
|
||||
counter.addAndGet(expungeEverythingByTypeWithoutPurging(BulkImportJobEntity.class));
|
||||
counter.addAndGet(expungeEverythingByTypeWithoutPurging(ForcedId.class));
|
||||
|
|
|
@ -35,7 +35,6 @@ import ca.uhn.fhir.jpa.api.svc.IIdHelperService;
|
|||
import ca.uhn.fhir.jpa.model.cross.IResourceLookup;
|
||||
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
|
||||
import ca.uhn.fhir.jpa.searchparam.extractor.IResourceLinkResolver;
|
||||
import ca.uhn.fhir.jpa.util.MemoryCacheService;
|
||||
import ca.uhn.fhir.mdm.util.CanonicalIdentifier;
|
||||
import ca.uhn.fhir.rest.api.server.RequestDetails;
|
||||
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
|
||||
|
@ -62,6 +61,7 @@ import javax.persistence.EntityManager;
|
|||
import javax.persistence.PersistenceContext;
|
||||
import javax.persistence.PersistenceContextType;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
|
@ -80,11 +80,23 @@ public class DaoResourceLinkResolver implements IResourceLinkResolver {
|
|||
|
||||
@Override
|
||||
public IResourceLookup findTargetResource(@Nonnull RequestPartitionId theRequestPartitionId, RuntimeSearchParam theSearchParam, String theSourcePath, IIdType theSourceResourceId, String theResourceType, Class<? extends IBaseResource> theType, IBaseReference theReference, RequestDetails theRequest, TransactionDetails theTransactionDetails) {
|
||||
ResourcePersistentId persistentId = null;
|
||||
if (theTransactionDetails != null) {
|
||||
ResourcePersistentId resolvedResourceId = theTransactionDetails.getResolvedResourceId(theSourceResourceId);
|
||||
if (resolvedResourceId != null && resolvedResourceId.getIdAsLong() != null && resolvedResourceId.getAssociatedResourceId() != null) {
|
||||
persistentId = resolvedResourceId;
|
||||
}
|
||||
}
|
||||
|
||||
IResourceLookup resolvedResource;
|
||||
String idPart = theSourceResourceId.getIdPart();
|
||||
try {
|
||||
resolvedResource = myIdHelperService.resolveResourceIdentity(theRequestPartitionId, theResourceType, idPart);
|
||||
ourLog.trace("Translated {}/{} to resource PID {}", theType, idPart, resolvedResource);
|
||||
if (persistentId == null) {
|
||||
resolvedResource = myIdHelperService.resolveResourceIdentity(theRequestPartitionId, theResourceType, idPart);
|
||||
ourLog.trace("Translated {}/{} to resource PID {}", theType, idPart, resolvedResource);
|
||||
} else {
|
||||
resolvedResource = new ResourceLookupPersistentIdWrapper(persistentId);
|
||||
}
|
||||
} catch (ResourceNotFoundException e) {
|
||||
|
||||
Optional<ResourceTable> createdTableOpt = createPlaceholderTargetIfConfiguredToDoSo(theType, theReference, idPart, theRequest, theTransactionDetails);
|
||||
|
@ -113,6 +125,12 @@ public class DaoResourceLinkResolver implements IResourceLinkResolver {
|
|||
throw new InvalidRequestException(Msg.code(1096) + "Resource " + resName + "/" + idPart + " is deleted, specified in path: " + theSourcePath);
|
||||
}
|
||||
|
||||
if (persistentId == null) {
|
||||
persistentId = new ResourcePersistentId(resolvedResource.getResourceId());
|
||||
persistentId.setAssociatedResourceId(theSourceResourceId);
|
||||
theTransactionDetails.addResolvedResourceId(theSourceResourceId, persistentId);
|
||||
}
|
||||
|
||||
if (!theSearchParam.hasTargets() && theSearchParam.getTargets().contains(theResourceType)) {
|
||||
return null;
|
||||
}
|
||||
|
@ -271,4 +289,26 @@ public class DaoResourceLinkResolver implements IResourceLinkResolver {
|
|||
myDaoRegistry.getDaoOrThrowException(theType);
|
||||
}
|
||||
|
||||
private static class ResourceLookupPersistentIdWrapper implements IResourceLookup {
|
||||
private final ResourcePersistentId myPersistentId;
|
||||
|
||||
public ResourceLookupPersistentIdWrapper(ResourcePersistentId thePersistentId) {
|
||||
myPersistentId = thePersistentId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getResourceType() {
|
||||
return myPersistentId.getAssociatedResourceId().getResourceType();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Long getResourceId() {
|
||||
return myPersistentId.getIdAsLong();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Date getDeleted() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,7 +29,7 @@ import ca.uhn.fhir.jpa.dao.predicate.querystack.QueryStack;
|
|||
import ca.uhn.fhir.jpa.model.config.PartitionSettings;
|
||||
import ca.uhn.fhir.jpa.model.entity.BasePartitionable;
|
||||
import ca.uhn.fhir.jpa.model.entity.BaseResourceIndexedSearchParam;
|
||||
import ca.uhn.fhir.jpa.model.entity.SearchParamPresent;
|
||||
import ca.uhn.fhir.jpa.model.entity.SearchParamPresentEntity;
|
||||
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
|
||||
import ca.uhn.fhir.model.api.IQueryParameterType;
|
||||
import ca.uhn.fhir.rest.param.ParamPrefixEnum;
|
||||
|
@ -78,10 +78,10 @@ abstract class BasePredicateBuilder {
|
|||
}
|
||||
|
||||
void addPredicateParamMissingForReference(String theResourceName, String theParamName, boolean theMissing, RequestPartitionId theRequestPartitionId) {
|
||||
From<?, SearchParamPresent> paramPresentJoin = myQueryStack.createJoin(SearchBuilderJoinEnum.PRESENCE, null);
|
||||
From<?, SearchParamPresentEntity> paramPresentJoin = myQueryStack.createJoin(SearchBuilderJoinEnum.PRESENCE, null);
|
||||
|
||||
Expression<Long> hashPresence = paramPresentJoin.get("myHashPresence").as(Long.class);
|
||||
Long hash = SearchParamPresent.calculateHashPresence(myPartitionSettings, theRequestPartitionId, theResourceName, theParamName, !theMissing);
|
||||
Long hash = SearchParamPresentEntity.calculateHashPresence(myPartitionSettings, theRequestPartitionId, theResourceName, theParamName, !theMissing);
|
||||
|
||||
List<Predicate> predicates = new ArrayList<>();
|
||||
predicates.add(myCriteriaBuilder.equal(hashPresence, hash));
|
||||
|
|
|
@ -37,7 +37,7 @@ import ca.uhn.fhir.jpa.model.entity.ResourceIndexedSearchParamQuantity;
|
|||
import ca.uhn.fhir.jpa.model.entity.ResourceIndexedSearchParamString;
|
||||
import ca.uhn.fhir.jpa.model.entity.ResourceIndexedSearchParamToken;
|
||||
import ca.uhn.fhir.jpa.model.entity.ResourceIndexedSearchParamUri;
|
||||
import ca.uhn.fhir.jpa.model.entity.SearchParamPresent;
|
||||
import ca.uhn.fhir.jpa.model.entity.SearchParamPresentEntity;
|
||||
import ca.uhn.fhir.util.VersionEnum;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
@ -1608,7 +1608,7 @@ public class HapiFhirJpaMigrationTasks extends BaseMigrationTasks<VersionEnum> {
|
|||
Boolean present = columnToBoolean(t.get("SP_PRESENT"));
|
||||
String resType = (String) t.get("RES_TYPE");
|
||||
String paramName = (String) t.get("PARAM_NAME");
|
||||
Long hash = SearchParamPresent.calculateHashPresence(new PartitionSettings(), (RequestPartitionId) null, resType, paramName, present);
|
||||
Long hash = SearchParamPresentEntity.calculateHashPresence(new PartitionSettings(), (RequestPartitionId) null, resType, paramName, present);
|
||||
consolidateSearchParamPresenceIndexesTask.executeSql("HFJ_RES_PARAM_PRESENT", "update HFJ_RES_PARAM_PRESENT set HASH_PRESENCE = ? where PID = ?", hash, pid);
|
||||
});
|
||||
version.addTask(consolidateSearchParamPresenceIndexesTask);
|
||||
|
|
|
@ -22,7 +22,7 @@ package ca.uhn.fhir.jpa.search.builder.predicate;
|
|||
|
||||
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
|
||||
import ca.uhn.fhir.jpa.model.config.PartitionSettings;
|
||||
import ca.uhn.fhir.jpa.model.entity.SearchParamPresent;
|
||||
import ca.uhn.fhir.jpa.model.entity.SearchParamPresentEntity;
|
||||
import ca.uhn.fhir.jpa.search.builder.sql.SearchQueryBuilder;
|
||||
import com.healthmarketscience.sqlbuilder.BinaryCondition;
|
||||
import com.healthmarketscience.sqlbuilder.Condition;
|
||||
|
@ -53,7 +53,7 @@ public class SearchParamPresentPredicateBuilder extends BaseJoiningPredicateBuil
|
|||
|
||||
|
||||
public Condition createPredicateParamMissingForReference(String theResourceName, String theParamName, boolean theMissing, RequestPartitionId theRequestPartitionId) {
|
||||
Long hash = SearchParamPresent.calculateHashPresence(myPartitionSettings, theRequestPartitionId, theResourceName, theParamName, !theMissing);
|
||||
Long hash = SearchParamPresentEntity.calculateHashPresence(myPartitionSettings, theRequestPartitionId, theResourceName, theParamName, !theMissing);
|
||||
BinaryCondition predicate = BinaryCondition.equalTo(myColumnHashPresence, generatePlaceholder(hash));
|
||||
return combineWithRequestPartitionIdPredicate(theRequestPartitionId, predicate);
|
||||
}
|
||||
|
|
|
@ -24,7 +24,7 @@ import ca.uhn.fhir.jpa.api.config.DaoConfig;
|
|||
import ca.uhn.fhir.jpa.dao.data.ISearchParamPresentDao;
|
||||
import ca.uhn.fhir.jpa.model.config.PartitionSettings;
|
||||
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
|
||||
import ca.uhn.fhir.jpa.model.entity.SearchParamPresent;
|
||||
import ca.uhn.fhir.jpa.model.entity.SearchParamPresentEntity;
|
||||
import ca.uhn.fhir.jpa.util.AddRemoveCount;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
@ -54,8 +54,7 @@ public class SearchParamPresenceSvcImpl implements ISearchParamPresenceSvc {
|
|||
}
|
||||
|
||||
@Override
|
||||
public AddRemoveCount
|
||||
updatePresence(ResourceTable theResource, Map<String, Boolean> theParamNameToPresence) {
|
||||
public AddRemoveCount updatePresence(ResourceTable theResource, Map<String, Boolean> theParamNameToPresence) {
|
||||
AddRemoveCount retVal = new AddRemoveCount();
|
||||
if (myDaoConfig.getIndexMissingFields() == DaoConfig.IndexEnabledEnum.DISABLED) {
|
||||
return retVal;
|
||||
|
@ -64,19 +63,18 @@ public class SearchParamPresenceSvcImpl implements ISearchParamPresenceSvc {
|
|||
Map<String, Boolean> presenceMap = new HashMap<>(theParamNameToPresence);
|
||||
|
||||
// Find existing entries
|
||||
Collection<SearchParamPresent> existing;
|
||||
existing = mySearchParamPresentDao.findAllForResource(theResource);
|
||||
Map<Long, SearchParamPresent> existingHashToPresence = new HashMap<>();
|
||||
for (SearchParamPresent nextExistingEntity : existing) {
|
||||
Collection<SearchParamPresentEntity> existing = theResource.getSearchParamPresents();
|
||||
Map<Long, SearchParamPresentEntity> existingHashToPresence = new HashMap<>();
|
||||
for (SearchParamPresentEntity nextExistingEntity : existing) {
|
||||
existingHashToPresence.put(nextExistingEntity.getHashPresence(), nextExistingEntity);
|
||||
}
|
||||
|
||||
// Find newly wanted set of entries
|
||||
Map<Long, SearchParamPresent> newHashToPresence = new HashMap<>();
|
||||
Map<Long, SearchParamPresentEntity> newHashToPresence = new HashMap<>();
|
||||
for (Entry<String, Boolean> next : presenceMap.entrySet()) {
|
||||
String paramName = next.getKey();
|
||||
|
||||
SearchParamPresent present = new SearchParamPresent();
|
||||
SearchParamPresentEntity present = new SearchParamPresentEntity();
|
||||
present.setPartitionSettings(myPartitionSettings);
|
||||
present.setResource(theResource);
|
||||
present.setParamName(paramName);
|
||||
|
@ -88,22 +86,33 @@ public class SearchParamPresenceSvcImpl implements ISearchParamPresenceSvc {
|
|||
}
|
||||
|
||||
// Delete any that should be deleted
|
||||
List<SearchParamPresent> toDelete = new ArrayList<>();
|
||||
for (Entry<Long, SearchParamPresent> nextEntry : existingHashToPresence.entrySet()) {
|
||||
List<SearchParamPresentEntity> toDelete = new ArrayList<>();
|
||||
for (Entry<Long, SearchParamPresentEntity> nextEntry : existingHashToPresence.entrySet()) {
|
||||
if (newHashToPresence.containsKey(nextEntry.getKey()) == false) {
|
||||
toDelete.add(nextEntry.getValue());
|
||||
}
|
||||
}
|
||||
mySearchParamPresentDao.deleteAll(toDelete);
|
||||
retVal.addToRemoveCount(toDelete.size());
|
||||
|
||||
// Add any that should be added
|
||||
List<SearchParamPresent> toAdd = new ArrayList<>();
|
||||
for (Entry<Long, SearchParamPresent> nextEntry : newHashToPresence.entrySet()) {
|
||||
List<SearchParamPresentEntity> toAdd = new ArrayList<>();
|
||||
for (Entry<Long, SearchParamPresentEntity> nextEntry : newHashToPresence.entrySet()) {
|
||||
if (existingHashToPresence.containsKey(nextEntry.getKey()) == false) {
|
||||
toAdd.add(nextEntry.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
// Try to reuse any entities we can
|
||||
while (toDelete.size() > 0 && toAdd.size() > 0) {
|
||||
SearchParamPresentEntity nextToDelete = toDelete.remove(toDelete.size() - 1);
|
||||
SearchParamPresentEntity nextToAdd = toAdd.remove(toAdd.size() - 1);
|
||||
nextToDelete.updateValues(nextToAdd);
|
||||
mySearchParamPresentDao.save(nextToDelete);
|
||||
retVal.addToAddCount(1);
|
||||
retVal.addToRemoveCount(1);
|
||||
}
|
||||
|
||||
mySearchParamPresentDao.deleteAll(toDelete);
|
||||
retVal.addToRemoveCount(toDelete.size());
|
||||
|
||||
mySearchParamPresentDao.saveAll(toAdd);
|
||||
retVal.addToRemoveCount(toAdd.size());
|
||||
|
||||
|
|
|
@ -8,6 +8,7 @@ import ca.uhn.fhir.batch2.model.WorkChunk;
|
|||
import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository;
|
||||
import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository;
|
||||
import ca.uhn.fhir.jpa.dao.r4.BaseJpaR4Test;
|
||||
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
|
||||
import ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity;
|
||||
import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity;
|
||||
import ca.uhn.fhir.util.JsonUtil;
|
||||
|
|
|
@ -203,7 +203,7 @@ public class BulkImportR4Test extends BaseJpaR4Test {
|
|||
|
||||
int fileCount = 3;
|
||||
List<String> indexes = addFiles(fileCount - 1);
|
||||
indexes.add(myBulkImportFileServlet.registerFile(() -> new StringReader("{\"resourceType\":\"Foo\"}")));
|
||||
indexes.add(myBulkImportFileServlet.registerFileByContents("{\"resourceType\":\"Foo\"}"));
|
||||
|
||||
BulkImportJobParameters parameters = new BulkImportJobParameters();
|
||||
for (String next : indexes) {
|
||||
|
@ -378,7 +378,7 @@ public class BulkImportR4Test extends BaseJpaR4Test {
|
|||
builder.append("\n");
|
||||
builder.append("\n");
|
||||
|
||||
String index = myBulkImportFileServlet.registerFile(() -> new StringReader(builder.toString()));
|
||||
String index = myBulkImportFileServlet.registerFileByContents(builder.toString());
|
||||
retVal.add(index);
|
||||
}
|
||||
return retVal;
|
||||
|
|
|
@ -3557,7 +3557,10 @@ public class FhirResourceDaoDstu3SearchNoFtTest extends BaseJpaDstu3Test {
|
|||
myCaptureQueriesListener.clear();
|
||||
myObservationDao.update(obs);
|
||||
|
||||
assertEquals(2, myCaptureQueriesListener.countUpdateQueries());
|
||||
assertEquals(10, myCaptureQueriesListener.countSelectQueries());
|
||||
assertEquals(5, myCaptureQueriesListener.countUpdateQueries());
|
||||
assertEquals(1, myCaptureQueriesListener.countInsertQueries());
|
||||
assertEquals(0, myCaptureQueriesListener.countDeleteQueries());
|
||||
String unformattedSql = myCaptureQueriesListener.getUpdateQueriesForCurrentThread().get(0).getSql(true, false);
|
||||
assertThat(unformattedSql, stringContainsInOrder(
|
||||
"SRC_PATH='Observation.performer'",
|
||||
|
|
|
@ -19,7 +19,7 @@ import ca.uhn.fhir.jpa.model.entity.ResourceIndexedSearchParamString;
|
|||
import ca.uhn.fhir.jpa.model.entity.ResourceLink;
|
||||
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
|
||||
import ca.uhn.fhir.jpa.model.entity.ResourceTag;
|
||||
import ca.uhn.fhir.jpa.model.entity.SearchParamPresent;
|
||||
import ca.uhn.fhir.jpa.model.entity.SearchParamPresentEntity;
|
||||
import ca.uhn.fhir.jpa.model.util.JpaConstants;
|
||||
import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
|
||||
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
|
||||
|
@ -442,7 +442,7 @@ public class PartitioningSqlR4Test extends BasePartitioningR4Test {
|
|||
assertEquals(myPartitionDate, resourceLinks.get(0).getPartitionId().getPartitionDate());
|
||||
|
||||
// HFJ_RES_PARAM_PRESENT
|
||||
List<SearchParamPresent> presents = mySearchParamPresentDao.findAllForResource(resourceTable);
|
||||
List<SearchParamPresentEntity> presents = mySearchParamPresentDao.findAllForResource(resourceTable);
|
||||
assertEquals(3, presents.size());
|
||||
assertEquals(myPartitionId, presents.get(0).getPartitionId().getPartitionId().intValue());
|
||||
assertEquals(myPartitionDate, presents.get(0).getPartitionId().getPartitionDate());
|
||||
|
@ -529,7 +529,7 @@ public class PartitioningSqlR4Test extends BasePartitioningR4Test {
|
|||
assertEquals(myPartitionDate, resourceLinks.get(0).getPartitionId().getPartitionDate());
|
||||
|
||||
// HFJ_RES_PARAM_PRESENT
|
||||
List<SearchParamPresent> presents = mySearchParamPresentDao.findAllForResource(resourceTable);
|
||||
List<SearchParamPresentEntity> presents = mySearchParamPresentDao.findAllForResource(resourceTable);
|
||||
assertEquals(3, presents.size());
|
||||
assertEquals(null, presents.get(0).getPartitionId().getPartitionId());
|
||||
assertEquals(myPartitionDate, presents.get(0).getPartitionId().getPartitionDate());
|
||||
|
|
|
@ -3,11 +3,18 @@ package ca.uhn.fhir.jpa.delete.job;
|
|||
import ca.uhn.fhir.batch2.api.IJobCoordinator;
|
||||
import ca.uhn.fhir.batch2.jobs.reindex.ReindexAppCtx;
|
||||
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
|
||||
import ca.uhn.fhir.batch2.model.JobInstance;
|
||||
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
|
||||
import ca.uhn.fhir.batch2.model.StatusEnum;
|
||||
import ca.uhn.fhir.interceptor.api.HookParams;
|
||||
import ca.uhn.fhir.interceptor.api.IAnonymousInterceptor;
|
||||
import ca.uhn.fhir.interceptor.api.IPointcut;
|
||||
import ca.uhn.fhir.interceptor.api.Pointcut;
|
||||
import ca.uhn.fhir.jpa.dao.r4.BaseJpaR4Test;
|
||||
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
|
||||
import org.hl7.fhir.instance.model.api.IIdType;
|
||||
import org.hl7.fhir.r4.model.Observation;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
|
@ -29,6 +36,11 @@ public class ReindexJobTest extends BaseJpaR4Test {
|
|||
myReindexTestHelper = new ReindexTestHelper(myFhirContext, myDaoRegistry, mySearchParamRegistry);
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void after() {
|
||||
myInterceptorRegistry.unregisterAllAnonymousInterceptors();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReindex_ByUrl() {
|
||||
// setup
|
||||
|
@ -92,4 +104,61 @@ public class ReindexJobTest extends BaseJpaR4Test {
|
|||
assertThat(myReindexTestHelper.getAlleleObservationIds(), hasSize(50));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testReindex_ExceptionThrownDuringWrite() {
|
||||
// setup
|
||||
|
||||
myReindexTestHelper.createObservationWithAlleleExtension(Observation.ObservationStatus.FINAL);
|
||||
myReindexTestHelper.createAlleleSearchParameter();
|
||||
mySearchParamRegistry.forceRefresh();
|
||||
|
||||
// Throw an exception during reindex
|
||||
|
||||
IAnonymousInterceptor exceptionThrowingInterceptor = (pointcut, args) -> {
|
||||
throw new NullPointerException("foo message");
|
||||
};
|
||||
myInterceptorRegistry.registerAnonymousInterceptor(Pointcut.JPA_PERFTRACE_INFO, exceptionThrowingInterceptor);
|
||||
|
||||
// execute
|
||||
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
|
||||
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);
|
||||
startRequest.setParameters(new ReindexJobParameters());
|
||||
String id = myJobCoordinator.startInstance(startRequest);
|
||||
JobInstance outcome = myBatch2JobHelper.awaitJobFailure(id);
|
||||
|
||||
// Verify
|
||||
|
||||
assertEquals(StatusEnum.ERRORED, outcome.getStatus());
|
||||
assertEquals("java.lang.NullPointerException: foo message", outcome.getErrorMessage());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReindex_FailureThrownDuringWrite() {
|
||||
// setup
|
||||
|
||||
myReindexTestHelper.createObservationWithAlleleExtension(Observation.ObservationStatus.FINAL);
|
||||
myReindexTestHelper.createAlleleSearchParameter();
|
||||
mySearchParamRegistry.forceRefresh();
|
||||
|
||||
// Throw an error (will be treated as unrecoverable) during reindex
|
||||
|
||||
IAnonymousInterceptor exceptionThrowingInterceptor = (pointcut, args) -> {
|
||||
throw new Error("foo message");
|
||||
};
|
||||
myInterceptorRegistry.registerAnonymousInterceptor(Pointcut.JPA_PERFTRACE_INFO, exceptionThrowingInterceptor);
|
||||
|
||||
// execute
|
||||
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
|
||||
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);
|
||||
startRequest.setParameters(new ReindexJobParameters());
|
||||
String id = myJobCoordinator.startInstance(startRequest);
|
||||
JobInstance outcome = myBatch2JobHelper.awaitJobFailure(id);
|
||||
|
||||
// Verify
|
||||
|
||||
assertEquals(StatusEnum.FAILED, outcome.getStatus());
|
||||
assertEquals("java.lang.Error: foo message", outcome.getErrorMessage());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -5,9 +5,15 @@ import ca.uhn.fhir.batch2.api.RunOutcome;
|
|||
import ca.uhn.fhir.batch2.api.VoidModel;
|
||||
import ca.uhn.fhir.batch2.jobs.reindex.ReindexChunkIds;
|
||||
import ca.uhn.fhir.batch2.jobs.reindex.ReindexStep;
|
||||
import ca.uhn.fhir.jpa.api.config.DaoConfig;
|
||||
import ca.uhn.fhir.jpa.dao.r4.BaseJpaR4Test;
|
||||
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
|
||||
import ca.uhn.fhir.model.dstu2.resource.Patient;
|
||||
import org.hl7.fhir.instance.model.api.IIdType;
|
||||
import org.hl7.fhir.r4.model.Enumerations;
|
||||
import org.hl7.fhir.r4.model.IdType;
|
||||
import org.hl7.fhir.r4.model.SearchParameter;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.Captor;
|
||||
|
@ -31,6 +37,11 @@ public class ReindexStepTest extends BaseJpaR4Test {
|
|||
@Captor
|
||||
private ArgumentCaptor<String> myErrorCaptor;
|
||||
|
||||
@AfterEach
|
||||
public void after() {
|
||||
myDaoConfig.setIndexMissingFields(new DaoConfig().getIndexMissingFields());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReindex_NoActionNeeded() {
|
||||
|
||||
|
@ -59,6 +70,36 @@ public class ReindexStepTest extends BaseJpaR4Test {
|
|||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testReindex_NoActionNeeded_IndexMissingFieldsEnabled() {
|
||||
|
||||
// Setup
|
||||
|
||||
myDaoConfig.setIndexMissingFields(DaoConfig.IndexEnabledEnum.ENABLED);
|
||||
|
||||
Long id0 = createPatient(withActiveTrue(), withFamily("SIMPSON")).getIdPartAsLong();
|
||||
Long id1 = createPatient(withActiveTrue(), withFamily("FLANDERS")).getIdPartAsLong();
|
||||
|
||||
ReindexChunkIds data = new ReindexChunkIds();
|
||||
data.getIds().add(new ReindexChunkIds.Id().setResourceType("Patient").setId(id0.toString()));
|
||||
data.getIds().add(new ReindexChunkIds.Id().setResourceType("Patient").setId(id1.toString()));
|
||||
|
||||
// Execute
|
||||
|
||||
myCaptureQueriesListener.clear();
|
||||
RunOutcome outcome = myReindexStep.doReindex(data, myDataSink, "index-id", "chunk-id");
|
||||
|
||||
// Verify
|
||||
assertEquals(2, outcome.getRecordsProcessed());
|
||||
assertEquals(6, myCaptureQueriesListener.logSelectQueries().size());
|
||||
assertEquals(0, myCaptureQueriesListener.countInsertQueries());
|
||||
assertEquals(0, myCaptureQueriesListener.countUpdateQueries());
|
||||
assertEquals(0, myCaptureQueriesListener.countDeleteQueries());
|
||||
assertEquals(1, myCaptureQueriesListener.getCommitCount());
|
||||
assertEquals(0, myCaptureQueriesListener.getRollbackCount());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testReindex_IndexesWereMissing() {
|
||||
|
||||
|
@ -93,6 +134,70 @@ public class ReindexStepTest extends BaseJpaR4Test {
|
|||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testReindex_IndexesAddedAndRemoved_IndexMissingFieldsEnabled() {
|
||||
|
||||
// Setup
|
||||
|
||||
myDaoConfig.setIndexMissingFields(DaoConfig.IndexEnabledEnum.ENABLED);
|
||||
|
||||
IIdType orgId = createOrganization(withId("ORG"));
|
||||
Long id0 = createPatient(withActiveTrue(), withFamily("SIMPSON"), withOrganization(orgId)).getIdPartAsLong();
|
||||
Long id1 = createPatient(withActiveTrue(), withFamily("FLANDERS"), withOrganization(orgId)).getIdPartAsLong();
|
||||
|
||||
ReindexChunkIds data = new ReindexChunkIds();
|
||||
data.getIds().add(new ReindexChunkIds.Id().setResourceType("Patient").setId(id0.toString()));
|
||||
data.getIds().add(new ReindexChunkIds.Id().setResourceType("Patient").setId(id1.toString()));
|
||||
|
||||
SearchParameter sp = new SearchParameter();
|
||||
sp.setType(Enumerations.SearchParamType.STRING);
|
||||
sp.addBase("Patient");
|
||||
sp.setStatus(Enumerations.PublicationStatus.ACTIVE);
|
||||
sp.setCode("family2");
|
||||
sp.setExpression("Patient.name.family");
|
||||
mySearchParameterDao.create(sp);
|
||||
|
||||
sp = new SearchParameter();
|
||||
sp.setType(Enumerations.SearchParamType.REFERENCE);
|
||||
sp.addBase("Patient");
|
||||
sp.setStatus(Enumerations.PublicationStatus.ACTIVE);
|
||||
sp.setCode(Patient.SP_ORGANIZATION + "2");
|
||||
sp.setExpression("Patient.managingOrganization");
|
||||
mySearchParameterDao.create(sp);
|
||||
|
||||
sp = new SearchParameter();
|
||||
sp.setType(Enumerations.SearchParamType.STRING);
|
||||
sp.addBase("Patient");
|
||||
sp.setStatus(Enumerations.PublicationStatus.RETIRED);
|
||||
sp.setCode("family");
|
||||
sp.setExpression("Patient.name.family");
|
||||
mySearchParameterDao.create(sp);
|
||||
|
||||
sp = new SearchParameter();
|
||||
sp.setType(Enumerations.SearchParamType.REFERENCE);
|
||||
sp.addBase("Patient");
|
||||
sp.setStatus(Enumerations.PublicationStatus.RETIRED);
|
||||
sp.setCode(Patient.SP_ORGANIZATION);
|
||||
sp.setExpression("Patient.managingOrganization");
|
||||
mySearchParameterDao.create(sp);
|
||||
|
||||
mySearchParamRegistry.forceRefresh();
|
||||
|
||||
// Execute
|
||||
|
||||
myCaptureQueriesListener.clear();
|
||||
RunOutcome outcome = myReindexStep.doReindex(data, myDataSink, "index-id", "chunk-id");
|
||||
|
||||
// Verify
|
||||
assertEquals(2, outcome.getRecordsProcessed());
|
||||
assertEquals(10, myCaptureQueriesListener.logSelectQueries().size());
|
||||
assertEquals(0, myCaptureQueriesListener.countInsertQueries());
|
||||
assertEquals(4, myCaptureQueriesListener.countUpdateQueries());
|
||||
assertEquals(0, myCaptureQueriesListener.countDeleteQueries());
|
||||
assertEquals(1, myCaptureQueriesListener.getCommitCount());
|
||||
assertEquals(0, myCaptureQueriesListener.getRollbackCount());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReindex_OneResourceReindexFailedButOthersSucceeded() {
|
||||
|
||||
|
|
|
@ -2,11 +2,11 @@ package ca.uhn.fhir.jpa.util;
|
|||
|
||||
import ca.uhn.fhir.batch2.api.IJobCleanerService;
|
||||
import ca.uhn.fhir.batch2.api.IJobCoordinator;
|
||||
import ca.uhn.fhir.batch2.model.JobInstance;
|
||||
import ca.uhn.fhir.batch2.model.StatusEnum;
|
||||
import org.hamcrest.Matchers;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.awaitility.Awaitility.await;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
|
@ -25,4 +25,12 @@ public class Batch2JobHelper {
|
|||
}, equalTo(StatusEnum.COMPLETED));
|
||||
}
|
||||
|
||||
public JobInstance awaitJobFailure(String theId) {
|
||||
await().until(() -> {
|
||||
myJobCleanerService.runCleanupPass();
|
||||
return myJobCoordinator.getInstance(theId).getStatus();
|
||||
}, Matchers.anyOf(equalTo(StatusEnum.ERRORED),equalTo(StatusEnum.FAILED)));
|
||||
return myJobCoordinator.getInstance(theId);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -264,7 +264,7 @@ public class ResourceTable extends BaseHasResource implements Serializable, IBas
|
|||
|
||||
@OneToMany(mappedBy = "myResource", cascade = CascadeType.ALL, fetch = FetchType.LAZY, orphanRemoval = true)
|
||||
@OptimisticLock(excluded = true)
|
||||
private Collection<SearchParamPresent> mySearchParamPresents;
|
||||
private Collection<SearchParamPresentEntity> mySearchParamPresents;
|
||||
|
||||
@OneToMany(mappedBy = "myResource", cascade = CascadeType.ALL, fetch = FetchType.LAZY, orphanRemoval = true)
|
||||
@OptimisticLock(excluded = true)
|
||||
|
@ -775,4 +775,11 @@ public class ResourceTable extends BaseHasResource implements Serializable, IBas
|
|||
public void setLuceneIndexData(ExtendedLuceneIndexData theLuceneIndexData) {
|
||||
myLuceneIndexData = theLuceneIndexData;
|
||||
}
|
||||
|
||||
public Collection<SearchParamPresentEntity> getSearchParamPresents() {
|
||||
if (mySearchParamPresents == null) {
|
||||
mySearchParamPresents = new ArrayList<>();
|
||||
}
|
||||
return mySearchParamPresents;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,7 +34,7 @@ import java.io.Serializable;
|
|||
@Index(name = "IDX_RESPARMPRESENT_RESID", columnList = "RES_ID"),
|
||||
@Index(name = "IDX_RESPARMPRESENT_HASHPRES", columnList = "HASH_PRESENCE")
|
||||
})
|
||||
public class SearchParamPresent extends BasePartitionable implements Serializable {
|
||||
public class SearchParamPresentEntity extends BasePartitionable implements Serializable {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
|
@ -60,7 +60,7 @@ public class SearchParamPresent extends BasePartitionable implements Serializabl
|
|||
/**
|
||||
* Constructor
|
||||
*/
|
||||
public SearchParamPresent() {
|
||||
public SearchParamPresentEntity() {
|
||||
super();
|
||||
}
|
||||
|
||||
|
@ -126,6 +126,18 @@ public class SearchParamPresent extends BasePartitionable implements Serializabl
|
|||
myPartitionSettings = thePartitionSettings;
|
||||
}
|
||||
|
||||
/**
|
||||
* Copy all mutable values from the given source
|
||||
*/
|
||||
public void updateValues(SearchParamPresentEntity theSource) {
|
||||
super.setPartitionId(theSource.getPartitionId());
|
||||
setResource(theSource.getResource());
|
||||
setPartitionSettings(theSource.getPartitionSettings());
|
||||
setHashPresence(theSource.getHashPresence());
|
||||
setParamName(theSource.getParamName());
|
||||
setPresent(theSource.isPresent());
|
||||
}
|
||||
|
||||
public static long calculateHashPresence(PartitionSettings thePartitionSettings, PartitionablePartitionId theRequestPartitionId, String theResourceType, String theParamName, Boolean thePresent) {
|
||||
RequestPartitionId requestPartitionId = PartitionablePartitionId.toRequestPartitionId(theRequestPartitionId);
|
||||
return calculateHashPresence(thePartitionSettings, requestPartitionId, theResourceType, theParamName, thePresent);
|
|
@ -16,6 +16,7 @@ import ca.uhn.fhir.jpa.provider.JpaCapabilityStatementProvider;
|
|||
import ca.uhn.fhir.jpa.provider.JpaConformanceProviderDstu2;
|
||||
import ca.uhn.fhir.jpa.provider.JpaSystemProviderDstu2;
|
||||
import ca.uhn.fhir.jpa.provider.TerminologyUploaderProvider;
|
||||
import ca.uhn.fhir.jpa.provider.ValueSetOperationProvider;
|
||||
import ca.uhn.fhir.jpa.provider.dstu3.JpaConformanceProviderDstu3;
|
||||
import ca.uhn.fhir.jpa.provider.dstu3.JpaSystemProviderDstu3;
|
||||
import ca.uhn.fhir.jpa.provider.r4.JpaSystemProviderR4;
|
||||
|
@ -265,20 +266,12 @@ public class TestRestfulServer extends RestfulServer {
|
|||
registerInterceptor(cascadingDeleteInterceptor);
|
||||
|
||||
/*
|
||||
* Bulk Export
|
||||
* Register some providers
|
||||
*/
|
||||
registerProvider(myAppCtx.getBean(BulkDataExportProvider.class));
|
||||
|
||||
/*
|
||||
* $reindex
|
||||
*/
|
||||
registerProvider(myAppCtx.getBean(ReindexProvider.class));
|
||||
|
||||
|
||||
/*
|
||||
* $diff operation
|
||||
*/
|
||||
registerProvider(myAppCtx.getBean(DiffProvider.class));
|
||||
registerProvider(myAppCtx.getBean(ValueSetOperationProvider.class));
|
||||
|
||||
/*
|
||||
* OpenAPI
|
||||
|
|
|
@ -63,13 +63,6 @@
|
|||
<build>
|
||||
<pluginManagement>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.codehaus.mojo</groupId>
|
||||
<artifactId>animal-sniffer-maven-plugin</artifactId>
|
||||
<configuration>
|
||||
<skip>true</skip>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.basepom.maven</groupId>
|
||||
<artifactId>duplicate-finder-maven-plugin</artifactId>
|
||||
|
|
|
@ -26,6 +26,7 @@ import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException;
|
|||
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
|
||||
import ca.uhn.fhir.util.UrlUtil;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.io.input.ReaderInputStream;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -34,7 +35,11 @@ import javax.servlet.http.HttpServlet;
|
|||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.Reader;
|
||||
import java.io.StringReader;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
@ -46,8 +51,8 @@ import static org.apache.commons.lang3.StringUtils.isBlank;
|
|||
|
||||
public class BulkImportFileServlet extends HttpServlet {
|
||||
|
||||
private static final long serialVersionUID = 8302513561762436076L;
|
||||
public static final String INDEX_PARAM = "index";
|
||||
private static final long serialVersionUID = 8302513561762436076L;
|
||||
private static final Logger ourLog = LoggerFactory.getLogger(BulkImportFileServlet.class);
|
||||
private final Map<String, IFileSupplier> myFileIds = new HashMap<>();
|
||||
|
||||
|
@ -93,8 +98,17 @@ public class BulkImportFileServlet extends HttpServlet {
|
|||
theResponse.addHeader(Constants.HEADER_CONTENT_TYPE, CT_FHIR_NDJSON + CHARSET_UTF8_CTSUFFIX);
|
||||
|
||||
IFileSupplier supplier = myFileIds.get(indexParam);
|
||||
try (Reader reader = supplier.get()) {
|
||||
IOUtils.copy(reader, theResponse.getWriter());
|
||||
if (supplier.isGzip()) {
|
||||
theResponse.addHeader(Constants.HEADER_CONTENT_ENCODING, Constants.ENCODING_GZIP);
|
||||
}
|
||||
|
||||
try (Reader reader = new InputStreamReader(supplier.get())) {
|
||||
String string = IOUtils.toString(reader);
|
||||
ourLog.info(string);
|
||||
}
|
||||
|
||||
try (InputStream reader = supplier.get()) {
|
||||
IOUtils.copy(reader, theResponse.getOutputStream());
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -112,10 +126,29 @@ public class BulkImportFileServlet extends HttpServlet {
|
|||
return index;
|
||||
}
|
||||
|
||||
@FunctionalInterface
|
||||
/**
|
||||
* Mostly intended for unit tests, registers a file
|
||||
* using raw file contents.
|
||||
*/
|
||||
public String registerFileByContents(String theFileContents) {
|
||||
return registerFile(new IFileSupplier() {
|
||||
@Override
|
||||
public boolean isGzip() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputStream get() {
|
||||
return new ReaderInputStream(new StringReader(theFileContents), StandardCharsets.UTF_8);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public interface IFileSupplier {
|
||||
|
||||
Reader get() throws IOException;
|
||||
boolean isGzip();
|
||||
|
||||
InputStream get() throws IOException;
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -26,9 +26,7 @@ import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
|
|||
import ca.uhn.fhir.batch2.api.RunOutcome;
|
||||
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
|
||||
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
|
||||
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
|
||||
import ca.uhn.fhir.jpa.api.svc.IResourceReindexSvc;
|
||||
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
@ -39,6 +37,7 @@ import java.util.Collection;
|
|||
import java.util.Date;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
|
@ -48,12 +47,6 @@ public class LoadIdsStep implements IJobStepWorker<ReindexJobParameters, Reindex
|
|||
@Autowired
|
||||
private IResourceReindexSvc myResourceReindexSvc;
|
||||
|
||||
@Autowired
|
||||
private DaoRegistry myDaoRegistry;
|
||||
|
||||
@Autowired
|
||||
private MatchUrlService myMatchUrlService;
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public RunOutcome run(@Nonnull StepExecutionDetails<ReindexJobParameters, ReindexChunkRange> theStepExecutionDetails, @Nonnull IJobDataSink<ReindexChunkIds> theDataSink) throws JobExecutionFailedException {
|
||||
|
@ -67,7 +60,7 @@ public class LoadIdsStep implements IJobStepWorker<ReindexJobParameters, Reindex
|
|||
|
||||
Date nextStart = start;
|
||||
RequestPartitionId requestPartitionId = theStepExecutionDetails.getParameters().getRequestPartitionId();
|
||||
Set<ReindexChunkIds.Id> idBuffer = new HashSet<>();
|
||||
Set<ReindexChunkIds.Id> idBuffer = new LinkedHashSet<>();
|
||||
long previousLastTime = 0L;
|
||||
int totalIdsFound = 0;
|
||||
int chunkCount = 0;
|
||||
|
@ -84,13 +77,6 @@ public class LoadIdsStep implements IJobStepWorker<ReindexJobParameters, Reindex
|
|||
|
||||
ourLog.info("Found {} IDs from {} to {}", nextChunk.getIds().size(), nextStart, nextChunk.getLastDate());
|
||||
|
||||
// If we get the same last time twice in a row, we've clearly reached the end
|
||||
if (nextChunk.getLastDate().getTime() == previousLastTime) {
|
||||
ourLog.info("Matching final timestamp of {}, loading is completed", new Date(previousLastTime));
|
||||
break;
|
||||
}
|
||||
previousLastTime = nextChunk.getLastDate().getTime();
|
||||
|
||||
for (int i = 0; i < nextChunk.getIds().size(); i++) {
|
||||
ReindexChunkIds.Id nextId = new ReindexChunkIds.Id();
|
||||
nextId.setResourceType(nextChunk.getResourceTypes().get(i));
|
||||
|
@ -98,9 +84,16 @@ public class LoadIdsStep implements IJobStepWorker<ReindexJobParameters, Reindex
|
|||
idBuffer.add(nextId);
|
||||
}
|
||||
|
||||
// If we get the same last time twice in a row, we've clearly reached the end
|
||||
if (nextChunk.getLastDate().getTime() == previousLastTime) {
|
||||
ourLog.info("Matching final timestamp of {}, loading is completed", new Date(previousLastTime));
|
||||
break;
|
||||
}
|
||||
|
||||
previousLastTime = nextChunk.getLastDate().getTime();
|
||||
nextStart = nextChunk.getLastDate();
|
||||
|
||||
if (idBuffer.size() >= 1000) {
|
||||
while (idBuffer.size() >= 1000) {
|
||||
|
||||
List<ReindexChunkIds.Id> submissionIds = new ArrayList<>();
|
||||
for (Iterator<ReindexChunkIds.Id> iter = idBuffer.iterator(); iter.hasNext(); ) {
|
||||
|
|
|
@ -24,6 +24,8 @@ import ca.uhn.fhir.model.api.IModelJson;
|
|||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import org.apache.commons.lang3.builder.EqualsBuilder;
|
||||
import org.apache.commons.lang3.builder.HashCodeBuilder;
|
||||
import org.apache.commons.lang3.builder.ToStringBuilder;
|
||||
import org.apache.commons.lang3.builder.ToStringStyle;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
@ -40,6 +42,13 @@ public class ReindexChunkIds implements IModelJson {
|
|||
return myIds;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
|
||||
.append("ids", myIds)
|
||||
.toString();
|
||||
}
|
||||
|
||||
|
||||
public static class Id implements IModelJson {
|
||||
|
||||
|
@ -48,6 +57,13 @@ public class ReindexChunkIds implements IModelJson {
|
|||
@JsonProperty("id")
|
||||
private String myId;
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
// We put a space in here and not a "/" since this is a PID, not
|
||||
// a resource ID
|
||||
return "[" + myResourceType + " " + myId + "]";
|
||||
}
|
||||
|
||||
public String getResourceType() {
|
||||
return myResourceType;
|
||||
}
|
||||
|
|
|
@ -63,8 +63,9 @@ public class ReindexChunkRange implements IModelJson {
|
|||
return myStart;
|
||||
}
|
||||
|
||||
public void setStart(@Nonnull Date theStart) {
|
||||
public ReindexChunkRange setStart(@Nonnull Date theStart) {
|
||||
myStart = theStart;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
|
@ -72,8 +73,9 @@ public class ReindexChunkRange implements IModelJson {
|
|||
return myEnd;
|
||||
}
|
||||
|
||||
public void setEnd(@Nonnull Date theEnd) {
|
||||
public ReindexChunkRange setEnd(@Nonnull Date theEnd) {
|
||||
myEnd = theEnd;
|
||||
return this;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -30,7 +30,6 @@ import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
|
|||
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
|
||||
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
|
||||
import ca.uhn.fhir.jpa.api.svc.IIdHelperService;
|
||||
import ca.uhn.fhir.jpa.api.svc.IResourceReindexSvc;
|
||||
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
|
||||
import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
|
||||
import ca.uhn.fhir.parser.DataFormatException;
|
||||
|
@ -56,8 +55,6 @@ public class ReindexStep implements IJobStepWorker<ReindexJobParameters, Reindex
|
|||
@Autowired
|
||||
private HapiTransactionService myHapiTransactionService;
|
||||
@Autowired
|
||||
private IResourceReindexSvc myResourceReindexSvc;
|
||||
@Autowired
|
||||
private IFhirSystemDao<?, ?> mySystemDao;
|
||||
@Autowired
|
||||
private DaoRegistry myDaoRegistry;
|
||||
|
|
|
@ -36,7 +36,7 @@ public class BulkImportFileServletTest {
|
|||
public void testDownloadFile() throws IOException {
|
||||
String input = "{\"resourceType\":\"Patient\", \"id\": \"A\", \"active\": true}\n" +
|
||||
"{\"resourceType\":\"Patient\", \"id\": \"B\", \"active\": false}";
|
||||
String index = mySvc.registerFile(() -> new StringReader(input));
|
||||
String index = mySvc.registerFileByContents(input);
|
||||
|
||||
CloseableHttpClient client = myServletExtension.getHttpClient();
|
||||
|
||||
|
|
|
@ -42,7 +42,7 @@ public class FetchFilesStepTest {
|
|||
|
||||
// Setup
|
||||
|
||||
String index = myBulkImportFileServlet.registerFile(() -> new StringReader("{\"resourceType\":\"Patient\"}"));
|
||||
String index = myBulkImportFileServlet.registerFileByContents("{\"resourceType\":\"Patient\"}");
|
||||
|
||||
BulkImportJobParameters parameters = new BulkImportJobParameters()
|
||||
.addNdJsonUrl(myHttpServletExtension.getBaseUrl() + "/download?index=" + index)
|
||||
|
@ -71,7 +71,7 @@ public class FetchFilesStepTest {
|
|||
b.append("{\"resourceType\":\"Patient\"}").append("\n");
|
||||
}
|
||||
String resource = b.toString();
|
||||
String index = myBulkImportFileServlet.registerFile(() -> new StringReader(resource));
|
||||
String index = myBulkImportFileServlet.registerFileByContents(resource);
|
||||
|
||||
BulkImportJobParameters parameters = new BulkImportJobParameters()
|
||||
.addNdJsonUrl(myHttpServletExtension.getBaseUrl() + "/download?index=" + index)
|
||||
|
@ -93,7 +93,7 @@ public class FetchFilesStepTest {
|
|||
|
||||
// Setup
|
||||
|
||||
String index = myBulkImportFileServlet.registerFile(() -> new StringReader("{\"resourceType\":\"Patient\"}"));
|
||||
String index = myBulkImportFileServlet.registerFileByContents("{\"resourceType\":\"Patient\"}");
|
||||
|
||||
BulkImportJobParameters parameters = new BulkImportJobParameters()
|
||||
.addNdJsonUrl(myHttpServletExtension.getBaseUrl() + "/download?index=" + index)
|
||||
|
|
|
@ -0,0 +1,98 @@
|
|||
package ca.uhn.fhir.batch2.jobs.reindex;
|
||||
|
||||
import ca.uhn.fhir.batch2.api.IJobDataSink;
|
||||
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
|
||||
import ca.uhn.fhir.jpa.api.svc.IResourceReindexSvc;
|
||||
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
|
||||
import org.hl7.fhir.r4.model.InstantType;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.Captor;
|
||||
import org.mockito.InjectMocks;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.ArgumentMatchers.isNull;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
public class LoadIdsStepTest {
|
||||
|
||||
public static final Date DATE_1 = new InstantType("2022-01-01T00:00:00Z").getValue();
|
||||
public static final Date DATE_2 = new InstantType("2022-01-02T00:00:00Z").getValue();
|
||||
public static final Date DATE_3 = new InstantType("2022-01-03T00:00:00Z").getValue();
|
||||
public static final Date DATE_4 = new InstantType("2022-01-04T00:00:00Z").getValue();
|
||||
public static final Date DATE_END = new InstantType("2022-02-01T00:00:00Z").getValue();
|
||||
|
||||
@Mock
|
||||
private IResourceReindexSvc myResourceReindexSvc;
|
||||
|
||||
@Mock
|
||||
private IJobDataSink<ReindexChunkIds> mySink;
|
||||
|
||||
@InjectMocks
|
||||
private LoadIdsStep mySvc;
|
||||
|
||||
@Captor
|
||||
private ArgumentCaptor<ReindexChunkIds> myChunkIdsCaptor;
|
||||
|
||||
@Test
|
||||
public void testGenerateSteps() {
|
||||
ReindexJobParameters parameters = new ReindexJobParameters();
|
||||
ReindexChunkRange range = new ReindexChunkRange()
|
||||
.setStart(DATE_1)
|
||||
.setEnd(DATE_END);
|
||||
String instanceId = "instance-id";
|
||||
String chunkId = "chunk-id";
|
||||
StepExecutionDetails<ReindexJobParameters, ReindexChunkRange> details = new StepExecutionDetails<>(parameters, range, instanceId, chunkId);
|
||||
|
||||
// First Execution
|
||||
|
||||
when(myResourceReindexSvc.fetchResourceIdsPage(eq(DATE_1), eq(DATE_END), isNull(), isNull()))
|
||||
.thenReturn(createIdChunk(0L, 20000L, DATE_2));
|
||||
when(myResourceReindexSvc.fetchResourceIdsPage(eq(DATE_2), eq(DATE_END), isNull(), isNull()))
|
||||
.thenReturn(createIdChunk(20000L, 40000L, DATE_3));
|
||||
when(myResourceReindexSvc.fetchResourceIdsPage(eq(DATE_3), eq(DATE_END), isNull(), isNull()))
|
||||
.thenReturn(createIdChunk(40000L, 40040L, DATE_3));
|
||||
|
||||
mySvc.run(details, mySink);
|
||||
verify(mySink, times(41)).accept(myChunkIdsCaptor.capture());
|
||||
for (int i = 0; i < 40; i++) {
|
||||
assertEquals(createIdChunk(i * 1000, (i * 1000) + 1000).toString(), myChunkIdsCaptor.getAllValues().get(i).toString());
|
||||
}
|
||||
assertEquals(createIdChunk(40000, 40040).toString(), myChunkIdsCaptor.getAllValues().get(40).toString());
|
||||
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
private ReindexChunkIds createIdChunk(int theLow, int theHigh) {
|
||||
ReindexChunkIds retVal = new ReindexChunkIds();
|
||||
for (int i = theLow; i < theHigh; i++) {
|
||||
retVal.getIds().add(new ReindexChunkIds.Id().setResourceType("Patient").setId(Integer.toString(i)));
|
||||
}
|
||||
return retVal;
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
private IResourceReindexSvc.IdChunk createIdChunk(long idLow, long idHigh, Date lastDate) {
|
||||
List<ResourcePersistentId> ids = new ArrayList<>();
|
||||
List<String> resourceTypes = new ArrayList<>();
|
||||
for (long i = idLow; i < idHigh; i++) {
|
||||
ids.add(new ResourcePersistentId(i));
|
||||
resourceTypes.add("Patient");
|
||||
}
|
||||
IResourceReindexSvc.IdChunk chunk = new IResourceReindexSvc.IdChunk(ids, resourceTypes, lastDate);
|
||||
return chunk;
|
||||
}
|
||||
|
||||
}
|
|
@ -14,7 +14,7 @@
|
|||
<packaging>jar</packaging>
|
||||
|
||||
<name>HAPI FHIR JPA Server - Batch2 Task Processor</name>
|
||||
<description></description>
|
||||
<description>Batch2 is a framework for managing and executing long running "batch" jobs</description>
|
||||
|
||||
<dependencies>
|
||||
|
||||
|
|
|
@ -218,6 +218,10 @@ public class JobCoordinatorImpl extends BaseJobService implements IJobCoordinato
|
|||
ourLog.error("Failure executing job {} step {}", jobDefinitionId, targetStepId, e);
|
||||
myJobPersistence.markWorkChunkAsErroredAndIncrementErrorCount(chunkId, e.toString());
|
||||
throw new JobExecutionFailedException(Msg.code(2041) + e.getMessage(), e);
|
||||
} catch (Throwable t) {
|
||||
ourLog.error("Unexpected failure executing job {} step {}", jobDefinitionId, targetStepId, t);
|
||||
myJobPersistence.markWorkChunkAsFailed(chunkId, t.toString());
|
||||
return false;
|
||||
}
|
||||
|
||||
int recordsProcessed = outcome.getRecordsProcessed();
|
||||
|
|
|
@ -24,6 +24,7 @@ import ca.uhn.fhir.batch2.api.IJobPersistence;
|
|||
import ca.uhn.fhir.batch2.model.JobInstance;
|
||||
import ca.uhn.fhir.batch2.model.WorkChunk;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
|
|
|
@ -167,13 +167,6 @@
|
|||
<argLine>@{argLine} ${surefire_jvm_args}</argLine>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.codehaus.mojo</groupId>
|
||||
<artifactId>animal-sniffer-maven-plugin</artifactId>
|
||||
<configuration>
|
||||
<skip>true</skip>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
<resources>
|
||||
<resource>
|
||||
|
|
23
pom.xml
23
pom.xml
|
@ -852,8 +852,10 @@
|
|||
<fontawesomeVersion>5.4.1</fontawesomeVersion>
|
||||
<maven.compiler.source>11</maven.compiler.source>
|
||||
<maven.compiler.target>11</maven.compiler.target>
|
||||
<maven.compiler.release>11</maven.compiler.release>
|
||||
<maven.compiler.testSource>17</maven.compiler.testSource>
|
||||
<maven.compiler.testTarget>17</maven.compiler.testTarget>
|
||||
<maven.compiler.testRelease>17</maven.compiler.testRelease>
|
||||
</properties>
|
||||
|
||||
<dependencyManagement>
|
||||
|
@ -2055,7 +2057,7 @@
|
|||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<version>3.8.1</version>
|
||||
<version>3.10.1</version>
|
||||
<configuration>
|
||||
<forceJavacCompilerUse>true</forceJavacCompilerUse>
|
||||
<encoding>UTF-8</encoding>
|
||||
|
@ -2143,11 +2145,6 @@
|
|||
<artifactId>buildnumber-maven-plugin</artifactId>
|
||||
<version>1.4</version>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.codehaus.mojo</groupId>
|
||||
<artifactId>animal-sniffer-maven-plugin</artifactId>
|
||||
<version>1.20</version>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.codehaus.mojo</groupId>
|
||||
<artifactId>cobertura-maven-plugin</artifactId>
|
||||
|
@ -2319,10 +2316,10 @@
|
|||
<version>3.5.4</version>
|
||||
</requireMavenVersion>
|
||||
<requireJavaVersion>
|
||||
<version>11</version>
|
||||
<version>17</version>
|
||||
<message>
|
||||
HAPI FHIR is still targeting JDK 8 for published binaries, but we require JDK 11
|
||||
to build and test this library.
|
||||
HAPI FHIR is targeting JDK 11 for published binaries,
|
||||
but we require JDK 17 to build and test this library.
|
||||
</message>
|
||||
</requireJavaVersion>
|
||||
</rules>
|
||||
|
@ -2976,13 +2973,6 @@
|
|||
<execution><id>validate</id><phase>none</phase></execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.codehaus.mojo</groupId>
|
||||
<artifactId>animal-sniffer-maven-plugin</artifactId>
|
||||
<executions>
|
||||
<execution><id>check-java-api</id><phase>none</phase></execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</profile>
|
||||
|
@ -3008,6 +2998,7 @@
|
|||
<configuration>
|
||||
<source>17</source>
|
||||
<target>17</target>
|
||||
<release>17</release>
|
||||
<testSource>17</testSource>
|
||||
<testTarget>17</testTarget>
|
||||
</configuration>
|
||||
|
|
Loading…
Reference in New Issue