6_4 mergeback (#4547)

* One more fix for #4467

* Enabling massIngestionMode causes incomplete resource deletion (#4476)

* Adding initial test.

* Adding fix and subsequent test.

* Adding changelog.

---------

Co-authored-by: peartree <etienne.poirier@smilecdr.com>

* Provide the capability to request that the name of the subscription matching channel be unqualified (#4464)

* Adding initial test.

* Adding initial solution implementation.

* Adding change log and code clean up.

* addressing comments from 1st code review.

---------

Co-authored-by: peartree <etienne.poirier@smilecdr.com>

* Change visibility of migration method (#4471)

* change migration visibility

* add empty migration method for 640

---------

Co-authored-by: nathaniel.doef <nathaniel.doef@smilecdr.com>

* Fix subscription validation not to validate partition ID when invoked from an update pointcut (#4484)

* First commit:  Make SubscriptionValidatingInterceptor aware of which Pointcut is being called.  In validatePermissions(), skip determinePartition() if the Pointcut is STORAGE_PRESTORAGE_RESOURCE_UPDATED.   Fix resulting compile errors in various unit tests.

* Fix/enhance unit tests.  Mark methods as deprecated instead of deleting them.  Add proper error code.  Complete changelog.

* Remove erroneous TODOs and tweak the validation logic.

* Enhance unit tests and fix changelog.

* Reindex batch job fails when processing deleted resources. (#4482)

* adding changelog.

* Providing solution and adding changelog.

* Adding new test.

---------

Co-authored-by: peartree <etienne.poirier@smilecdr.com>

* cleaning up checkstyle files (#4470)

* cleaning up checkstyle files

* One more fix for #4467 (#4469)

* added exlusions for files at base project level so checkstyle doesn't error out

* duplicate error code from merge

* changing lifecycle goal for all module checkstyle check

* moving checkstyle to base pom file, changing exectution phase on base check, cleaning dependency, resolving duplicate error code

* wip

* trying to figure out why pipeline cannot copy files

* removing modules that don't actually need to be built.

* I messed up the version

---------

Co-authored-by: James Agnew <jamesagnew@gmail.com>

* Bump core to 5.6.881 (#4496)

* Bump core to 5.6.881-SNAPSHOT

* Work on fixing tests

* Work on fixing tests 2

* Bump to core release

---------

Co-authored-by: dotasek <david.otasek@smilecdr.com>

* Issue 4486 mdm inconsistent possible match score values (#4487)

* Extract method for readability

* Save always normalized score values in POSSIBLE_MATCH links.

* Avoid setting properties to null values. Adjust test.

* Simplify fix

* Fix test. Add RangeTestHelper.

---------

Co-authored-by: juan.marchionatto <juan.marchionatto@smilecdr.com>

* Revert "cleaning up checkstyle files (#4470)"

This reverts commit efae3b5d5f.

* core version fix

* Loosen rules for id helper

* License

* fix batch2 reduction step (#4499)

* fix bug where FINALIZE jobs are not cancellable

* moved reduction step to message hander

* moving reduction step to queue

* addingchangelog

* cleaning up

* review fixes

* review fix'

---------

Co-authored-by: leif stawnyczy <leifstawnyczy@leifs-mbp.home>

* Scheduled batch 2 bulk export job and binary delete (#4492)

* First commit:  Scheduled batch 2 bulk export job delete and binary, incomplete mock-based unit test, and a mess of TODOs and code that needs to be deleted.

* Refine solution and add a concrete unit test but still work to do.

* Comment out code in cancelAndPurgeAllJobs() and see if it breaks the pipeline.

* Unit tests complete.  New Msg code for new IJobPersistence.fetchInstances() method.  Cleanup TODOs and add others.

* Finish final touches on implementation.

* Add changelog.

* Various cleanup.

* Code review feedback.

* Small tweak to changelog.

* Last code review tweak.

* Address more code review comments.

* Reverse changes to consider work chunks.  Add a constant for write-to-binary.

* Change bulk import test for valueUri type (#4503)

* change tests

* suggested test change

* CVE resolutions (#4513)

* Bump Postgres for CVE

* Bump jetty

* Verison bump

* Remove comments

* Revrt bump

* Add check in scanner (#4518)

* 4516 create hapi fhir cli command to clear stale lock entries (#4517)

* Initial implementation

* better tests

* Add changelog and docs

* Forgotten files

* Code review comments

* Fix checkstyle

* Unable to Expunge CodeSystem (#4507)

* changes for GL-3943

* changes for GL-3943

---------

Co-authored-by: isaacwen <isaac.wen@smilecdr.com>

* New line::

* Update to documentation regarding narrative generation; (#4521)

Providing changelog;

Co-authored-by: peartree <etienne.poirier@smilecdr.com>

* changed what score is set for mdmlinks that created new golden resource (#4514)

* changed what score is set for mdmlinks that created new golden resource

* fix test

---------

Co-authored-by: Long Ma <long@smilecdr.com>

* REVERT: change to operationoutcome.html

* trying to fix BulkDataExportTest testGroupBulkExportNotInGroup_DoesNo… (#4527)

* trying to fix BulkDataExportTest testGroupBulkExportNotInGroup_DoesNotShowUp

* added change log

---------

Co-authored-by: leif stawnyczy <leifstawnyczy@leifs-mbp.home>

* fix build (#4530)

* Making narrative_generation.md reference an html snippet (#4531)

Co-authored-by: peartree <etienne.poirier@smilecdr.com>

* fixed the issue of meta.source field inconsistently populated in subscription messages for different requests (#4524)

* fix + test

* minor fix

* Addressing suggestion

* Minor changes

* 4441 rel 6 4 bad references creation bug (#4519)

* adding a test

* fail in the case of ref enforce on type and on write and autocreate are all true

* update to code

* removing a line

* cleanup

* removing check on urn

* changing just to trigger a build

* adding a comment to the pom

* updating test for better information

---------

Co-authored-by: leif stawnyczy <leifstawnyczy@leifs-mbp.home>

* fixed channel import null pointer exception from null header (#4534)

* retryCount 0 on null header + test + changelog

* suggested changes

* Revert "fixed the issue of meta.source field inconsistently populated in subscription messages for different requests (#4524)" (#4535)

This reverts commit 53252b8d15.

* Better error handling for when channel type is not supported (#4538)

Co-authored-by: kylejule <kyle.jule@smilecdr.com>

* Avoid logging message payloads that contain sensitive data (#4537)

Don't log payloads - they may contain sensitive data.

* Bulk Export Bug With Many Resources and Low Max File Size (#4506)

* failing test

* fix + changelog

* tweak

* add method to IJobPersistence to use a Stream

* tweak

* tweak

* decrease test time

* clean up

* code review comments

* version bump

* Increase timeout limit to match BulkExportUseCaseTest

* shorten test

* maintenance pass

* add logging

* Revert "add logging"

This reverts commit b0453fd953.

* Revert "maintenance pass"

This reverts commit bbc7418d51.

* test

* trying to fix BulkDataExportTest testGroupBulkExportNotInGroup_DoesNotShowUp

* shorten tests

* logging

* move test location

* fixes a regression caused my change in hapi-fhir

* timeout

* Revert "fixes a regression caused my change in hapi-fhir"

This reverts commit 4b58013149.

* testing

* Revert "testing"

This reverts commit aafc95c2f3.

---------

Co-authored-by: leif stawnyczy <leifstawnyczy@leifs-mbp.home>

* bump ver

* License updates'

* Remove checkstyle

* Verison bump

* Move migrations

* semicolon

---------

Co-authored-by: James Agnew <jamesagnew@gmail.com>
Co-authored-by: Etienne Poirier <33007955+epeartree@users.noreply.github.com>
Co-authored-by: peartree <etienne.poirier@smilecdr.com>
Co-authored-by: Nathan Doef <n.doef@protonmail.com>
Co-authored-by: nathaniel.doef <nathaniel.doef@smilecdr.com>
Co-authored-by: Luke deGruchy <luke.degruchy@smilecdr.com>
Co-authored-by: Mark Iantorno <markiantorno@gmail.com>
Co-authored-by: dotasek <dotasek.dev@gmail.com>
Co-authored-by: dotasek <david.otasek@smilecdr.com>
Co-authored-by: jmarchionatto <60409882+jmarchionatto@users.noreply.github.com>
Co-authored-by: juan.marchionatto <juan.marchionatto@smilecdr.com>
Co-authored-by: Tadgh <garygrantgraham@gmail.com>
Co-authored-by: TipzCM <leif.stawnyczy@gmail.com>
Co-authored-by: leif stawnyczy <leifstawnyczy@leifs-mbp.home>
Co-authored-by: samguntersmilecdr <123124187+samguntersmilecdr@users.noreply.github.com>
Co-authored-by: Isaac Wen <76772867+isaacwen@users.noreply.github.com>
Co-authored-by: isaacwen <isaac.wen@smilecdr.com>
Co-authored-by: longma1 <32119004+longma1@users.noreply.github.com>
Co-authored-by: Long Ma <long@smilecdr.com>
Co-authored-by: Qingyixia <106992634+Qingyixia@users.noreply.github.com>
Co-authored-by: KGJ-software <39975592+KGJ-software@users.noreply.github.com>
Co-authored-by: kylejule <kyle.jule@smilecdr.com>
Co-authored-by: michaelabuckley <michaelabuckley@gmail.com>
Co-authored-by: Ken Stevens <ken@smilecdr.com>
This commit is contained in:
Ken Stevens 2023-02-13 16:23:25 -05:00 committed by GitHub
parent 765fedfefa
commit 794e4510c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
202 changed files with 3118 additions and 670 deletions

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.5.0-SNAPSHOT</version>
<version>6.5.1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.5.0-SNAPSHOT</version>
<version>6.5.1-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.5.0-SNAPSHOT</version>
<version>6.5.1-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -4,14 +4,14 @@
<modelVersion>4.0.0</modelVersion>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-bom</artifactId>
<version>6.5.0-SNAPSHOT</version>
<version>6.5.1-SNAPSHOT</version>
<packaging>pom</packaging>
<name>HAPI FHIR BOM</name>
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.5.0-SNAPSHOT</version>
<version>6.5.1-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.5.0-SNAPSHOT</version>
<version>6.5.1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.5.0-SNAPSHOT</version>
<version>6.5.1-SNAPSHOT</version>
<relativePath>../../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -186,6 +186,7 @@ public abstract class BaseApp {
commands.add(new ExportConceptMapToCsvCommand());
commands.add(new ImportCsvToConceptMapCommand());
commands.add(new HapiFlywayMigrateDatabaseCommand());
commands.add(new HapiClearMigrationLockCommand());
commands.add(new CreatePackageCommand());
commands.add(new BulkImportCommand());
commands.add(new ReindexTerminologyCommand());

View File

@ -0,0 +1,92 @@
package ca.uhn.fhir.cli;
/*-
* #%L
* HAPI FHIR - Command Line Client - API
* %%
* Copyright (C) 2014 - 2023 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.migrate.DriverTypeEnum;
import ca.uhn.fhir.jpa.migrate.HapiMigrator;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import java.util.Arrays;
import java.util.stream.Collectors;
/**
*
*/
public abstract class BaseClearMigrationLockCommand extends BaseCommand {
public static final String CLEAR_LOCK = "clear-migration-lock";
private String myMigrationTableName;
@Override
public String getCommandDescription() {
return "This command clears a database migration lock";
}
@Override
public String getCommandName() {
return CLEAR_LOCK;
}
@Override
public Options getOptions() {
Options retVal = new Options();
addRequiredOption(retVal, "u", "url", "URL", "The JDBC database URL");
addRequiredOption(retVal, "n", "username", "Username", "The JDBC database username");
addRequiredOption(retVal, "p", "password", "Password", "The JDBC database password");
addRequiredOption(retVal, "d", "driver", "Driver", "The database driver to use (Options are " + driverOptions() + ")");
addRequiredOption(retVal, "l", "lock-uuid", "Lock UUID", "The UUID value of the lock held in the database.");
return retVal;
}
private String driverOptions() {
return Arrays.stream(DriverTypeEnum.values()).map(Enum::name).collect(Collectors.joining(", "));
}
@Override
public void run(CommandLine theCommandLine) throws ParseException {
String url = theCommandLine.getOptionValue("u");
String username = theCommandLine.getOptionValue("n");
String password = theCommandLine.getOptionValue("p");
String lockUUID = theCommandLine.getOptionValue("l");
DriverTypeEnum driverType;
String driverTypeString = theCommandLine.getOptionValue("d");
try {
driverType = DriverTypeEnum.valueOf(driverTypeString);
} catch (Exception e) {
throw new ParseException(Msg.code(2774) + "Invalid driver type \"" + driverTypeString + "\". Valid values are: " + driverOptions());
}
DriverTypeEnum.ConnectionProperties connectionProperties = driverType.newConnectionProperties(url, username, password);
HapiMigrator migrator = new HapiMigrator(myMigrationTableName, connectionProperties.getDataSource(), driverType);
migrator.clearMigrationLockWithUUID(lockUUID);
}
protected void setMigrationTableName(String theMigrationTableName) {
myMigrationTableName = theMigrationTableName;
}
}

View File

@ -0,0 +1,33 @@
package ca.uhn.fhir.cli;
/*-
* #%L
* HAPI FHIR - Command Line Client - API
* %%
* Copyright (C) 2014 - 2023 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.jpa.migrate.SchemaMigrator;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.ParseException;
public class HapiClearMigrationLockCommand extends BaseClearMigrationLockCommand {
@Override
public void run(CommandLine theCommandLine) throws ParseException {
setMigrationTableName(SchemaMigrator.HAPI_FHIR_MIGRATION_TABLENAME);
super.run(theCommandLine);
}
}

View File

@ -1,31 +0,0 @@
package ca.uhn.fhir.cli;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.MatcherAssert.assertThat;
public class BaseAppTest {
private final PrintStream standardOut = System.out;
private final ByteArrayOutputStream outputStreamCaptor = new ByteArrayOutputStream();
@BeforeEach
public void setUp() {
System.setOut(new PrintStream(outputStreamCaptor));
}
@AfterEach
public void tearDown() {
System.setOut(standardOut);
}
@Test
public void testHelpOption() {
App.main(new String[]{"help", "create-package"});
assertThat(outputStreamCaptor.toString().trim(), outputStreamCaptor.toString().trim(), containsString("Usage"));
}
}

View File

@ -0,0 +1,32 @@
package ca.uhn.fhir.cli;
import org.apache.commons.io.output.TeeOutputStream;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
/**
* This class splits output stream to both STDOUT, and a capturing byte array output stream, which can later be inspected.
*/
public class ConsoleOutputCapturingBaseTest {
protected final ByteArrayOutputStream outputStreamCaptor = new ByteArrayOutputStream();
protected final TeeOutputStream myTeeOutputStream = new TeeOutputStream(System.out, outputStreamCaptor);
@BeforeEach
public void setUp() {
System.setOut(new PrintStream(myTeeOutputStream));
}
@AfterEach
public void tearDown() {
outputStreamCaptor.reset();
System.setOut(System.out);
}
protected String getConsoleOutput() {
return outputStreamCaptor.toString().trim();
}
}

View File

@ -0,0 +1,313 @@
package ca.uhn.fhir.cli;
import ca.uhn.fhir.jpa.migrate.DriverTypeEnum;
import ca.uhn.fhir.jpa.migrate.dao.HapiMigrationDao;
import ca.uhn.fhir.jpa.migrate.entity.HapiMigrationEntity;
import ca.uhn.fhir.system.HapiSystemProperties;
import com.google.common.base.Charsets;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.support.AbstractLobCreatingPreparedStatementCallback;
import org.springframework.jdbc.support.lob.DefaultLobHandler;
import org.springframework.jdbc.support.lob.LobCreator;
import javax.annotation.Nonnull;
import java.io.File;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.sql.Types;
import java.util.*;
import static ca.uhn.fhir.jpa.migrate.HapiMigrationLock.LOCK_PID;
import static ca.uhn.fhir.jpa.migrate.HapiMigrationStorageSvc.LOCK_TYPE;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.*;
import static org.slf4j.LoggerFactory.getLogger;
public class HapiClearMigrationLockCommandTest extends ConsoleOutputCapturingBaseTest {
private static final Logger ourLog = getLogger(HapiClearMigrationLockCommandTest.class);
public static final String DB_DIRECTORY = "target/h2_test";
static {
HapiSystemProperties.enableTestMode();
}
@Test
public void testClearNonExistingLockIncorrectLock() throws IOException {
ConnectionData connectionData = createSchemaAndMigrate("test_migrate_clear_incorrect_lock");
HapiMigrationDao dao = new HapiMigrationDao(connectionData.connectionProperties.getDataSource(), DriverTypeEnum.H2_EMBEDDED, "FLY_HFJ_MIGRATION");
String correctLockUUID = UUID.randomUUID().toString();
String incorrectLockUUID = UUID.randomUUID().toString();
createAndSaveLockRow(correctLockUUID, dao);
String[] args = new String[]{
BaseClearMigrationLockCommand.CLEAR_LOCK,
"-d", "H2_EMBEDDED",
"-u", connectionData.url,
"-n", "",
"-p", "",
"-l", incorrectLockUUID
};
int beforeClearMigrationCount = dao.findAll().size();
try {
App.main(args);
fail();
} catch (CommandFailureException e) {
assertThat(e.getMessage(), containsString("HAPI-2152: Internal error: on unlocking, a competing lock was found"));
}
}
@Test
public void testClearNonExistingLockNoLocks() throws IOException {
ConnectionData connectionData = createSchemaAndMigrate("test_migrate_clear_nonexisting_lock");
HapiMigrationDao dao = new HapiMigrationDao(connectionData.connectionProperties.getDataSource(), DriverTypeEnum.H2_EMBEDDED, "FLY_HFJ_MIGRATION");
String lockUUID = UUID.randomUUID().toString();
String[] args = new String[]{
BaseClearMigrationLockCommand.CLEAR_LOCK,
"-d", "H2_EMBEDDED",
"-u", connectionData.url,
"-n", "",
"-p", "",
"-l", lockUUID
};
int beforeClearMigrationCount = dao.findAll().size();
App.main(args);
int afterClearMigrationCount = dao.findAll().size();
int removedRows = beforeClearMigrationCount - afterClearMigrationCount;
assertEquals(0, removedRows);
assertThat(getConsoleOutput(), containsString("Did not successfully remove lock entry. [uuid="+ lockUUID +"]"));
}
@Test
public void testMigrateAndClearExistingLock() throws IOException, SQLException {
ConnectionData connectionData = createSchemaAndMigrate("test_migrate_clear_existing_lock");
HapiMigrationDao dao = new HapiMigrationDao(connectionData.connectionProperties.getDataSource(), DriverTypeEnum.H2_EMBEDDED, "FLY_HFJ_MIGRATION");
String lockUUID = UUID.randomUUID().toString();
createAndSaveLockRow(lockUUID, dao);
String[] args = new String[]{
BaseClearMigrationLockCommand.CLEAR_LOCK,
"-d", "H2_EMBEDDED",
"-u", connectionData.url,
"-n", "",
"-p", "",
"-l", lockUUID
};
int beforeClearMigrationCount = dao.findAll().size();
App.main(args);
int afterClearMigrationCount = dao.findAll().size();
int removedRows = beforeClearMigrationCount - afterClearMigrationCount;
assertEquals(1, removedRows);
assertThat(getConsoleOutput(), containsString("Successfully removed lock entry. [uuid="+ lockUUID +"]"));
}
private record ConnectionData(DriverTypeEnum.ConnectionProperties connectionProperties, String url) {}
public ConnectionData createSchemaAndMigrate(String theDbName) throws IOException {
File location = getLocation(theDbName);
String url = "jdbc:h2:" + location.getAbsolutePath();
DriverTypeEnum.ConnectionProperties connectionProperties = DriverTypeEnum.H2_EMBEDDED.newConnectionProperties(url, "", "");
String initSql = "/persistence_create_h2_340.sql";
executeSqlStatements(connectionProperties, initSql);
seedDatabase340(connectionProperties);
ourLog.info("**********************************************");
ourLog.info("Done Setup, Starting Migration...");
ourLog.info("**********************************************");
String[] args = new String[]{
BaseFlywayMigrateDatabaseCommand.MIGRATE_DATABASE,
"-d", "H2_EMBEDDED",
"-u", url,
"-n", "",
"-p", "",
"-r"
};
App.main(args);
return new ConnectionData(connectionProperties, url);
}
private static void createAndSaveLockRow(String theLockUUID, HapiMigrationDao theDao) {
HapiMigrationEntity me = new HapiMigrationEntity();
me.setPid(LOCK_PID);
me.setChecksum(100);
me.setDescription(theLockUUID);
me.setSuccess(true);
me.setExecutionTime(20);
me.setInstalledBy("gary");
me.setInstalledOn(new Date());
me.setVersion("2023.1");
me.setType(LOCK_TYPE);
theDao.save(me);
}
@Nonnull
private File getLocation(String theDatabaseName) throws IOException {
File directory = new File(DB_DIRECTORY);
if (directory.exists()) {
FileUtils.deleteDirectory(directory);
}
return new File(DB_DIRECTORY + "/" + theDatabaseName);
}
private void seedDatabase340(DriverTypeEnum.ConnectionProperties theConnectionProperties) {
theConnectionProperties.getTxTemplate().execute(t -> {
JdbcTemplate jdbcTemplate = theConnectionProperties.newJdbcTemplate();
jdbcTemplate.execute(
"insert into HFJ_RESOURCE (RES_DELETED_AT, RES_VERSION, FORCED_ID_PID, HAS_TAGS, RES_PUBLISHED, RES_UPDATED, SP_HAS_LINKS, HASH_SHA256, SP_INDEX_STATUS, RES_LANGUAGE, SP_CMPSTR_UNIQ_PRESENT, SP_COORDS_PRESENT, SP_DATE_PRESENT, SP_NUMBER_PRESENT, SP_QUANTITY_PRESENT, SP_STRING_PRESENT, SP_TOKEN_PRESENT, SP_URI_PRESENT, RES_PROFILE, RES_TYPE, RES_VER, RES_ID) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
new AbstractLobCreatingPreparedStatementCallback(new DefaultLobHandler()) {
@Override
protected void setValues(PreparedStatement thePs, LobCreator theLobCreator) throws SQLException {
thePs.setNull(1, Types.TIMESTAMP);
thePs.setString(2, "R4");
thePs.setNull(3, Types.BIGINT);
thePs.setBoolean(4, false);
thePs.setTimestamp(5, new Timestamp(System.currentTimeMillis()));
thePs.setTimestamp(6, new Timestamp(System.currentTimeMillis()));
thePs.setBoolean(7, false);
thePs.setNull(8, Types.VARCHAR);
thePs.setLong(9, 1L);
thePs.setNull(10, Types.VARCHAR);
thePs.setBoolean(11, false);
thePs.setBoolean(12, false);
thePs.setBoolean(13, false);
thePs.setBoolean(14, false);
thePs.setBoolean(15, false);
thePs.setBoolean(16, false);
thePs.setBoolean(17, false);
thePs.setBoolean(18, false);
thePs.setNull(19, Types.VARCHAR);
thePs.setString(20, "Patient");
thePs.setLong(21, 1L);
thePs.setLong(22, 1L);
}
}
);
jdbcTemplate.execute(
"insert into HFJ_RES_VER (RES_DELETED_AT, RES_VERSION, FORCED_ID_PID, HAS_TAGS, RES_PUBLISHED, RES_UPDATED, RES_ENCODING, RES_TEXT, RES_ID, RES_TYPE, RES_VER, PID) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
new AbstractLobCreatingPreparedStatementCallback(new DefaultLobHandler()) {
@Override
protected void setValues(PreparedStatement thePs, LobCreator theLobCreator) throws SQLException {
thePs.setNull(1, Types.TIMESTAMP);
thePs.setString(2, "R4");
thePs.setNull(3, Types.BIGINT);
thePs.setBoolean(4, false);
thePs.setTimestamp(5, new Timestamp(System.currentTimeMillis()));
thePs.setTimestamp(6, new Timestamp(System.currentTimeMillis()));
thePs.setString(7, "JSON");
theLobCreator.setBlobAsBytes(thePs, 8, "{\"resourceType\":\"Patient\"}".getBytes(Charsets.US_ASCII));
thePs.setLong(9, 1L);
thePs.setString(10, "Patient");
thePs.setLong(11, 1L);
thePs.setLong(12, 1L);
}
}
);
jdbcTemplate.execute(
"insert into HFJ_SPIDX_STRING (SP_MISSING, SP_NAME, RES_ID, RES_TYPE, SP_UPDATED, SP_VALUE_EXACT, SP_VALUE_NORMALIZED, SP_ID) values (?, ?, ?, ?, ?, ?, ?, ?)",
new AbstractLobCreatingPreparedStatementCallback(new DefaultLobHandler()) {
@Override
protected void setValues(PreparedStatement thePs, LobCreator theLobCreator) throws SQLException {
thePs.setBoolean(1, false);
thePs.setString(2, "given");
thePs.setLong(3, 1L); // res-id
thePs.setString(4, "Patient");
thePs.setTimestamp(5, new Timestamp(System.currentTimeMillis()));
thePs.setString(6, "ROBERT");
thePs.setString(7, "Robert");
thePs.setLong(8, 1L);
}
}
);
jdbcTemplate.execute(
"insert into HFJ_SPIDX_TOKEN (SP_MISSING, SP_NAME, RES_ID, RES_TYPE, SP_UPDATED, SP_SYSTEM, SP_VALUE, SP_ID) values (?, ?, ?, ?, ?, ?, ?, ?)",
new AbstractLobCreatingPreparedStatementCallback(new DefaultLobHandler()) {
@Override
protected void setValues(PreparedStatement thePs, LobCreator theLobCreator) throws SQLException {
thePs.setBoolean(1, false);
thePs.setString(2, "identifier");
thePs.setLong(3, 1L); // res-id
thePs.setString(4, "Patient");
thePs.setTimestamp(5, new Timestamp(System.currentTimeMillis()));
thePs.setString(6, "http://foo");
thePs.setString(7, "12345678");
thePs.setLong(8, 1L);
}
}
);
jdbcTemplate.execute(
"insert into HFJ_SPIDX_DATE (SP_MISSING, SP_NAME, RES_ID, RES_TYPE, SP_UPDATED, SP_VALUE_HIGH, SP_VALUE_LOW, SP_ID) values (?, ?, ?, ?, ?, ?, ?, ?)",
new AbstractLobCreatingPreparedStatementCallback(new DefaultLobHandler()) {
@Override
protected void setValues(PreparedStatement thePs, LobCreator theLobCreator) throws SQLException {
thePs.setBoolean(1, false);
thePs.setString(2, "birthdate");
thePs.setLong(3, 1L); // res-id
thePs.setString(4, "Patient");
thePs.setTimestamp(5, new Timestamp(System.currentTimeMillis()));
thePs.setTimestamp(6, new Timestamp(1000000000L)); // value high
thePs.setTimestamp(7, new Timestamp(1000000000L)); // value low
thePs.setLong(8, 1L);
}
}
);
return null;
});
}
private void executeSqlStatements(DriverTypeEnum.ConnectionProperties theConnectionProperties, String theInitSql) throws
IOException {
String script = IOUtils.toString(HapiClearMigrationLockCommandTest.class.getResourceAsStream(theInitSql), Charsets.UTF_8);
List<String> scriptStatements = new ArrayList<>(Arrays.asList(script.split("\n")));
for (int i = 0; i < scriptStatements.size(); i++) {
String nextStatement = scriptStatements.get(i);
if (isBlank(nextStatement)) {
scriptStatements.remove(i);
i--;
continue;
}
nextStatement = nextStatement.trim();
while (nextStatement.endsWith(";")) {
nextStatement = nextStatement.substring(0, nextStatement.length() - 1);
}
scriptStatements.set(i, nextStatement);
}
theConnectionProperties.getTxTemplate().execute(t -> {
for (String next : scriptStatements) {
theConnectionProperties.newJdbcTemplate().execute(next);
}
return null;
});
}
}

View File

@ -0,0 +1,13 @@
package ca.uhn.fhir.cli;
import org.junit.jupiter.api.Test;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.MatcherAssert.assertThat;
public class HelpOptionTest extends ConsoleOutputCapturingBaseTest {
@Test
public void testHelpOption() {
App.main(new String[]{"help", "create-package"});
assertThat(outputStreamCaptor.toString().trim(), outputStreamCaptor.toString().trim(), containsString("Usage"));
}
}

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-cli</artifactId>
<version>6.5.0-SNAPSHOT</version>
<version>6.5.1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.5.0-SNAPSHOT</version>
<version>6.5.1-SNAPSHOT</version>
<relativePath>../../hapi-deployable-pom</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.5.0-SNAPSHOT</version>
<version>6.5.1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.5.0-SNAPSHOT</version>
<version>6.5.1-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.5.0-SNAPSHOT</version>
<version>6.5.1-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.5.0-SNAPSHOT</version>
<version>6.5.1-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.5.0-SNAPSHOT</version>
<version>6.5.1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.5.0-SNAPSHOT</version>
<version>6.5.1-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -42,7 +42,7 @@ FhirContext ctx = FhirContext.forDstu2();
ctx.setNarrativeGenerator(new DefaultThymeleafNarrativeGenerator());
// Encode the output, including the narrative
String output = ctx.newJsonParser().setPrettyPrint(true).encodeResourceToString(patient);
String output = ctx.newXmlParser().setPrettyPrint(true).encodeResourceToString(patient);
System.out.println(output);
//END SNIPPET: example1

View File

@ -21,6 +21,7 @@ package ca.uhn.hapi.fhir.docs;
*/
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.model.dstu2.resource.Patient;
import ca.uhn.fhir.narrative.CustomThymeleafNarrativeGenerator;
@SuppressWarnings("unused")
@ -33,7 +34,11 @@ FhirContext ctx = FhirContext.forDstu2();
String propFile = "classpath:/com/foo/customnarrative.properties";
CustomThymeleafNarrativeGenerator gen = new CustomThymeleafNarrativeGenerator(propFile);
Patient patient = new Patient();
ctx.setNarrativeGenerator(gen);
String output = ctx.newJsonParser().encodeResourceToString(patient);
System.out.println(output);
//END SNIPPET: gen

View File

@ -0,0 +1,4 @@
type: fix
issue: 3482
jira: SMILE-5076
title: "Previously, persistence modules were attempting to activate subscriptions that used channel types they did not support. This has been changed, and those subscriptions will not be activated if the given channel type is not supported"

View File

@ -0,0 +1,5 @@
---
type: fix
issue: 4090
title: "Previously, mdm links that connected resources and golden resources that were newly created had a link score of null,
this changes it to 1.0 as the golden resource should be a perfect match with the source resource it was created from."

View File

@ -0,0 +1,7 @@
---
type: fix
issue: 4441
title: "Creating a resource with an invalid embedded resource reference
would not fail. Even if IsEnforceReferentialIntegrityOnWrite was enabled.
This has been fixed, and invalid references will throw.
"

View File

@ -0,0 +1,5 @@
---
type: add
issue: 4463
jira: SMILE-4770
title: "Providing the capability to specify that the name of the subscription matching channel should be unqualified at creation time."

View File

@ -0,0 +1,5 @@
---
type: fix
issue: 4475
jira: SMILE-4961
title: "Enabling mass ingestion mode alters the resource deletion process leaving resources partially deleted. The problem has been fixed."

View File

@ -0,0 +1,5 @@
---
type: fix
issue: 4481
jira: SMILE-4961
title: "Previously, a reindex batch job would fail when executing on deleted resources. This issue has been fixed."

View File

@ -0,0 +1,5 @@
type: fix
issue: 4485
jira: SMILE-5561
title: "Cross-partition subscription PUT with a custom interceptor will fail on validation because the read partition ID is used.
This has been fixed by skipping validation if the validator invoked during an update operation"

View File

@ -0,0 +1,3 @@
type: fix
issue: 4486
title: "Previously, some MDM links of type `POSSIBLE_MATCH` were saved with unnormalized score values. This has been fixed."

View File

@ -0,0 +1,5 @@
---
type: fix
issue: 4491
title: "Batch2 Jobs in the FINALIZE state can now be
cancelled."

View File

@ -0,0 +1,8 @@
---
type: fix
issue: 4491
title: "Moved batch2 reduction step logic to the messaging queue.
Before it was executed during the maintenance run directly.
This resulted in bugs with multiple reduction steps kicking
off for long running jobs.
"

View File

@ -0,0 +1,5 @@
---
type: fix
issue: 4520
jira: SMILE-4406
title: "Updating documentation related to narrative generation."

View File

@ -0,0 +1,5 @@
---
type: fix
issue: 4500
jira: SMILE-6001
title: "Schedule bulk export job and binary was not working with relational databases. This has now been fixed with a reimplementation for batch 2."

View File

@ -0,0 +1,5 @@
---
type: fix
issue: 4508
title: "Deleting CodeSystem resources by URL then expunging would fail to
expunge and a foreign key error would be thrown. This has been fixed."

View File

@ -0,0 +1,6 @@
---
type: fix
issue: 4511
jira: SMILE-6064
title: "Previously, bulk export jobs were getting stuck in the `FINALIZE` state when performed
with many resources and a low Bulk Export File Maximum Capacity. This has been fixed."

View File

@ -0,0 +1,4 @@
---
type: fix
issue: 4516
title: "A new command has been added to the HAPI-FHIR CLI called `clear-migration-lock`. This can be used to fix a database state which can occur if a migration is interrupted before completing."

View File

@ -0,0 +1,7 @@
---
type: fix
issue: 4526
title: "Fixing an issue where a long running reduction step causes
the message not to be processed fast enough, thereby allowing
multiple reduction step jobs to start.
"

View File

@ -0,0 +1,7 @@
---
type: fix
issue: 4533
jira: SMILE-5554
title: "Previously, if a message with a null header was sent to a Channel Import module and failed,
a NullPointerException would occur and the consumer would become unable to receive any further messages.
This has now been fixed."

View File

@ -0,0 +1,5 @@
---
type: change
issue: 4537
title: "ResourceDeliveryMessage no longer includes the payload in toString().
This avoids leaking sensitive data to logs and other channels."

View File

@ -2,7 +2,7 @@
HAPI provides several ways to add [Narrative Text](http://hl7.org/fhir/narrative.html) to your encoded messages.
The simplest way is to simply place the narrative text directly in the resource via the `setDivAsString()` method.
The simplest way is to place the narrative text directly in the resource via the `setDivAsString()` method.
```java
{{snippet:classpath:/ca/uhn/hapi/fhir/docs/Narrative.java|simple}}
@ -18,7 +18,7 @@ HAPI's built-in narrative generation uses the [Thymeleaf](http://www.thymeleaf.o
## A Simple Example
Activating HAPI's built-in narrative generator is as simple as calling [setNarrativeGenerator](/hapi-fhir/apidocs/hapi-fhir-base/ca/uhn/fhir/context/FhirContext.html#setNarrativeGenerator(ca.uhn.fhir.narrative.INarrativeGenerator).
Activating HAPI's built-in narrative generator is as simple as calling [setNarrativeGenerator](/hapi-fhir/apidocs/hapi-fhir-base/ca/uhn/fhir/context/FhirContext.html#setNarrativeGenerator(ca.uhn.fhir.narrative.INarrativeGenerator)).
```java
{{snippet:classpath:/ca/uhn/hapi/fhir/docs/Narrative.java|example1}}
@ -54,8 +54,8 @@ Note that these templates expect a few specific CSS definitions to be present in
To use your own templates for narrative generation, simply create one or more templates, using the Thymeleaf HTML based syntax.
```java
{{snippet:classpath:/ca/uhn/fhir/narrative/OperationOutcome.html}}
```html
{{snippet:classpath:/ca/uhn/hapi/fhir/docs/snippet/OperationOutcome.html}}
```
Then create a properties file which describes your templates. In this properties file, each resource to be defined has a pair or properties.
@ -79,13 +79,16 @@ vitalsigns.profile=http://hl7.org/fhir/StructureDefinition/vitalsigns
vitalsigns.narrative=classpath:com/example/narrative/Observation_Vitals.html
```
You may also override/define behaviour for datatypes and other structures. These datatype narrative definitions will be used as content within <code>th:narrative</code> blocks in resource templates. See the example resource template above for an example.
You may also override/define behaviour for datatypes and other structures. These datatype narrative definitions will be used as content within <code>th:narrative</code> blocks in resource templates. See the [example above](#creating-your-own-templates).
```properties
# You can create a template based on a type name
quantity.dataType=Quantity
quantity.narrative=classpath:com/example/narrative/Quantity.html
string.dataType=String
string.narrative=classpath:com/example/narrative/String.html
# Or by class name, which can be useful for custom datatypes and structures
custom_extension.class=com.example.model.MyCustomExtension
custom_extension.narrative=classpath:com/example/narrative/CustomExtension.html
@ -105,13 +108,13 @@ Thymeleaf has a concept called Fragments, which allow reusable template portions
{{snippet:classpath:ca/uhn/fhir/narrative/narrative-with-fragment.properties}}
```
The following template declares a fragment (this is `narrative-with-fragment-child.html` in the example above):
The following template declares `Fragment1` and `Fragment2` as part of file `narrative-with-fragment-child.html`:
```html
{{snippet:classpath:ca/uhn/fhir/narrative/narrative-with-fragment-child.html}}
```
And the following template uses it (this is `narrative-with-fragment-child.html` in the example above):
And the following parent template (`narrative-with-fragment-parent.html`) imports `Fragment1` with parameter 'blah':
```html
{{snippet:classpath:ca/uhn/fhir/narrative/narrative-with-fragment-parent.html}}

View File

@ -0,0 +1,23 @@
<html>
<head>
<link rel="stylesheet" type="text/css" href="narrative.css"/>
</head>
<body>
<!--*/-->
<div>
<h1>Operation Outcome</h1>
<table border="0">
<tr th:each="issue : ${resource.issue}">
<td th:text="${issue.severityElement.value}" style="font-weight: bold;"></td>
<td th:text="${issue.location}"></td>
<td th:narrative="${issue.diagnostics}"></td>
</tr>
</table>
</div>
<!--*/-->
</body>
</html>

View File

@ -101,6 +101,22 @@ The `migrate-database` command may be used to Migrate a database schema when upg
See [Upgrading HAPI FHIR JPA](/docs/server_jpa/upgrading.html) for information on how to use this command.
# Clear Migration lock
the `clear-migration-lock` command should be used if an upgrade to HAPI-FHIR failed during a migration. The migration system creates a lock row when it begins. If the migration is cancelled before it finishes, the system will be left in an inconsistent state. In order to resume the migration, the lock row must be removed. From your migration logs, you will see a line which looks like the following:
```text
Migration Lock Row added. [uuid=05931c87-c2a4-49d6-8d82-d8ce09fdd8ef]
```
In order to clear this migration lock, you can run:
```bash
clear-migration-lock --lock-uuid 05931c87-c2a4-49d6-8d82-d8ce09fdd8ef
```
# Reindex Terminology
The `reindex-terminology` command may be used to recreate freetext indexes for terminology resources.

View File

@ -11,7 +11,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.5.0-SNAPSHOT</version>
<version>6.5.1-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.5.0-SNAPSHOT</version>
<version>6.5.1-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.5.0-SNAPSHOT</version>
<version>6.5.1-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.5.0-SNAPSHOT</version>
<version>6.5.1-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -29,13 +29,13 @@ import ca.uhn.fhir.batch2.model.MarkWorkChunkAsErrorRequest;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest;
import ca.uhn.fhir.util.Logs;
import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository;
import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository;
import ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity;
import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity;
import ca.uhn.fhir.jpa.util.JobInstanceUtil;
import ca.uhn.fhir.model.api.PagingIterator;
import ca.uhn.fhir.util.Logs;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
@ -59,6 +59,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.commons.lang3.StringUtils.isBlank;
@ -135,6 +136,12 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
return entity.getId();
}
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
public List<JobInstance> fetchInstances(String theJobDefinitionId, Set<StatusEnum> theStatuses, Date theCutoff, Pageable thePageable) {
return toInstanceList(myJobInstanceRepository.findInstancesByJobIdAndStatusAndExpiry(theJobDefinitionId, theStatuses, theCutoff, thePageable));
}
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
public List<JobInstance> fetchInstancesByJobDefinitionIdAndStatus(String theJobDefinitionId, Set<StatusEnum> theRequestedStatuses, int thePageSize, int thePageIndex) {
@ -326,13 +333,21 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
}
/**
* Deprecated, use {@link ca.uhn.fhir.jpa.batch2.JpaJobPersistenceImpl#fetchAllWorkChunksForStepStream(String, String)}
* Note: Not @Transactional because the transaction happens in a lambda that's called outside of this method's scope
*/
@Override
@Deprecated
public Iterator<WorkChunk> fetchAllWorkChunksForStepIterator(String theInstanceId, String theStepId) {
return new PagingIterator<>((thePageIndex, theBatchSize, theConsumer) -> fetchChunksForStep(theInstanceId, theStepId, theBatchSize, thePageIndex, theConsumer));
}
@Override
@Transactional(propagation = Propagation.MANDATORY)
public Stream<WorkChunk> fetchAllWorkChunksForStepStream(String theInstanceId, String theStepId) {
return myWorkChunkRepository.fetchChunksForStep(theInstanceId, theStepId).map((entity) -> toChunk(entity, true));
}
/**
* Update the stored instance
*

View File

@ -20,38 +20,47 @@ package ca.uhn.fhir.jpa.bulk.export.svc;
* #L%
*/
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.jobs.export.BulkExportAppCtx;
import ca.uhn.fhir.batch2.jobs.export.models.BulkExportBinaryFileId;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.model.ExpungeOptions;
import ca.uhn.fhir.jpa.api.model.BulkExportJobResults;
import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportJobSchedulingHelper;
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionDao;
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionFileDao;
import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionEntity;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionFileEntity;
import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
import ca.uhn.fhir.jpa.model.sched.HapiJob;
import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.util.Batch2JobDefinitionConstants;
import ca.uhn.fhir.util.JsonUtil;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.hl7.fhir.instance.model.api.IBaseBinary;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.Binary;
import org.quartz.JobExecutionContext;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionTemplate;
import javax.annotation.Nonnull;
import javax.annotation.PostConstruct;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import static org.slf4j.LoggerFactory.getLogger;
@ -59,26 +68,24 @@ import static org.slf4j.LoggerFactory.getLogger;
public class BulkDataExportJobSchedulingHelperImpl implements IBulkDataExportJobSchedulingHelper, IHasScheduledJobs {
private static final Logger ourLog = getLogger(BulkDataExportJobSchedulingHelperImpl.class);
@Autowired
private DaoRegistry myDaoRegistry;
private final DaoRegistry myDaoRegistry;
@Autowired
private IBulkExportCollectionDao myBulkExportCollectionDao;
@Autowired
private IBulkExportCollectionFileDao myBulkExportCollectionFileDao;
@Autowired
private PlatformTransactionManager myTxManager;
private final PlatformTransactionManager myTxManager;
private TransactionTemplate myTxTemplate;
@Autowired
private IBulkExportJobDao myBulkExportJobDao;
private final DaoConfig myDaoConfig;
private final BulkExportHelperService myBulkExportHelperSvc;
@Autowired
private DaoConfig myDaoConfig;
@Autowired
private BulkExportHelperService myBulkExportHelperSvc;
private final IJobPersistence myJpaJobPersistence;
public BulkDataExportJobSchedulingHelperImpl(DaoRegistry theDaoRegistry, PlatformTransactionManager theTxManager, DaoConfig theDaoConfig, BulkExportHelperService theBulkExportHelperSvc, IJobPersistence theJpaJobPersistence, TransactionTemplate theTxTemplate) {
myDaoRegistry = theDaoRegistry;
myTxManager = theTxManager;
myDaoConfig = theDaoConfig;
myBulkExportHelperSvc = theBulkExportHelperSvc;
myJpaJobPersistence = theJpaJobPersistence;
myTxTemplate = theTxTemplate;
}
@PostConstruct
public void start() {
@ -97,15 +104,10 @@ public class BulkDataExportJobSchedulingHelperImpl implements IBulkDataExportJob
@Override
@Transactional(propagation = Propagation.NEVER)
public synchronized void cancelAndPurgeAllJobs() {
myTxTemplate.execute(t -> {
ourLog.info("Deleting all files");
myBulkExportCollectionFileDao.deleteAllFiles();
ourLog.info("Deleting all collections");
myBulkExportCollectionDao.deleteAllFiles();
ourLog.info("Deleting all jobs");
myBulkExportJobDao.deleteAllFiles();
return null;
});
// This is called by unit test code that also calls ExpungeEverythingService,
// which explicitly deletes both Batch2WorkChunkEntity and Batch2JobInstanceEntity, as well as ResourceTable, in
// which Binary's are stored
// Long story short, this method no longer needs to do anything
}
/**
@ -116,51 +118,111 @@ public class BulkDataExportJobSchedulingHelperImpl implements IBulkDataExportJob
@Override
public void purgeExpiredFiles() {
if (!myDaoConfig.isEnableTaskBulkExportJobExecution()) {
ourLog.debug("bulk export disabled: doing nothing");
return;
}
Optional<BulkExportJobEntity> jobToDelete = myTxTemplate.execute(t -> {
Pageable page = PageRequest.of(0, 1);
Slice<BulkExportJobEntity> submittedJobs = myBulkExportJobDao.findNotRunningByExpiry(page, new Date());
if (submittedJobs.isEmpty()) {
return Optional.empty();
}
return Optional.of(submittedJobs.getContent().get(0));
});
final List<JobInstance> jobInstancesToDelete = myTxTemplate.execute(t ->
myJpaJobPersistence.fetchInstances(Batch2JobDefinitionConstants.BULK_EXPORT,
StatusEnum.getEndedStatuses(),
computeCutoffFromConfig(),
PageRequest.of(0, 50))
);
if (jobToDelete.isPresent()) {
ourLog.info("Deleting bulk export job: {}", jobToDelete.get());
if (jobInstancesToDelete == null || jobInstancesToDelete.isEmpty()) {
ourLog.debug("No batch 2 bulk export jobs found! Nothing to do!");
ourLog.info("Finished bulk export job deletion with nothing to do");
return;
}
for (JobInstance jobInstance : jobInstancesToDelete) {
ourLog.info("Deleting batch 2 bulk export job: {}", jobInstance);
myTxTemplate.execute(t -> {
BulkExportJobEntity job = myBulkExportJobDao.getOne(jobToDelete.get().getId());
for (BulkExportCollectionEntity nextCollection : job.getCollections()) {
for (BulkExportCollectionFileEntity nextFile : nextCollection.getFiles()) {
final Optional<JobInstance> optJobInstanceForInstanceId = myJpaJobPersistence.fetchInstance(jobInstance.getInstanceId());
ourLog.info("Purging bulk data file: {}", nextFile.getResourceId());
IIdType id = myBulkExportHelperSvc.toId(nextFile.getResourceId());
getBinaryDao().delete(id, new SystemRequestDetails());
getBinaryDao().forceExpungeInExistingTransaction(id, new ExpungeOptions().setExpungeDeletedResources(true).setExpungeOldVersions(true), new SystemRequestDetails());
myBulkExportCollectionFileDao.deleteByPid(nextFile.getId());
}
myBulkExportCollectionDao.deleteByPid(nextCollection.getId());
if (optJobInstanceForInstanceId.isEmpty()) {
ourLog.error("Can't find job instance for ID: {} despite having retrieved it in the first step", jobInstance.getInstanceId());
return null;
}
ourLog.debug("*** About to delete job with ID {}", job.getId());
myBulkExportJobDao.deleteByPid(job.getId());
final JobInstance jobInstanceForInstanceId = optJobInstanceForInstanceId.get();
ourLog.info("Deleting bulk export job: {}", jobInstanceForInstanceId);
// We need to keep these for investigation but we also need a process to manually delete these jobs once we're done investigating
if (StatusEnum.FAILED == jobInstanceForInstanceId.getStatus()) {
ourLog.info("skipping because the status is FAILED for ID: {}" + jobInstanceForInstanceId.getInstanceId());
return null;
}
purgeBinariesIfNeeded(jobInstanceForInstanceId, jobInstanceForInstanceId.getReport());
final String batch2BulkExportJobInstanceId = jobInstanceForInstanceId.getInstanceId();
ourLog.debug("*** About to delete batch 2 bulk export job with ID {}", batch2BulkExportJobInstanceId);
myJpaJobPersistence.deleteInstanceAndChunks(batch2BulkExportJobInstanceId);
ourLog.info("Finished deleting bulk export job: {}", jobInstance.getInstanceId());
return null;
});
ourLog.info("Finished deleting bulk export job: {}", jobToDelete.get());
ourLog.info("Finished deleting bulk export jobs");
}
}
private void purgeBinariesIfNeeded(JobInstance theJobInstanceForInstanceId, String theJobInstanceReportString) {
final Optional<BulkExportJobResults> optBulkExportJobResults = getBulkExportJobResults(theJobInstanceReportString);
if (optBulkExportJobResults.isPresent()) {
final BulkExportJobResults bulkExportJobResults = optBulkExportJobResults.get();
ourLog.debug("job: {} resource type to binary ID: {}", theJobInstanceForInstanceId.getInstanceId(), bulkExportJobResults.getResourceTypeToBinaryIds());
final Map<String, List<String>> resourceTypeToBinaryIds = bulkExportJobResults.getResourceTypeToBinaryIds();
for (String resourceType : resourceTypeToBinaryIds.keySet()) {
final List<String> binaryIds = resourceTypeToBinaryIds.get(resourceType);
for (String binaryId : binaryIds) {
ourLog.info("Purging batch 2 bulk export binary: {}", binaryId);
IIdType id = myBulkExportHelperSvc.toId(binaryId);
getBinaryDao().delete(id, new SystemRequestDetails());
}
}
} // else we can't know what the binary IDs are, so delete this job and move on
}
@SuppressWarnings("unchecked")
private IFhirResourceDao<IBaseBinary> getBinaryDao() {
return myDaoRegistry.getResourceDao("Binary");
return myDaoRegistry.getResourceDao(Binary.class.getSimpleName());
}
@Nonnull
private Optional<BulkExportJobResults> getBulkExportJobResults(String theJobInstanceReportString) {
if (StringUtils.isBlank(theJobInstanceReportString)) {
ourLog.error(String.format("Cannot parse job report string because it's null or blank: %s", theJobInstanceReportString));
return Optional.empty();
}
try {
return Optional.of(JsonUtil.deserialize(theJobInstanceReportString, BulkExportJobResults.class));
} catch (Exception theException) {
ourLog.error(String.format("Cannot parse job report string: %s", theJobInstanceReportString), theException);
return Optional.empty();
}
}
@Nonnull
private Date computeCutoffFromConfig() {
final int bulkExportFileRetentionPeriodHours = myDaoConfig.getBulkExportFileRetentionPeriodHours();
final LocalDateTime cutoffLocalDateTime = LocalDateTime.now()
.minusHours(bulkExportFileRetentionPeriodHours);
return Date.from(cutoffLocalDateTime
.atZone(ZoneId.systemDefault())
.toInstant());
}
public static class PurgeExpiredFilesJob implements HapiJob {
@Autowired
private IBulkDataExportJobSchedulingHelper myTarget;

View File

@ -1,41 +0,0 @@
package ca.uhn.fhir.jpa.bulk.export.svc;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2023 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionFileDao;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionFileEntity;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class BulkExportCollectionFileDaoSvc {
@Autowired
private IBulkExportCollectionFileDao myBulkExportCollectionFileDao;
@Transactional
public void save(BulkExportCollectionFileEntity theBulkExportCollectionEntity) {
myBulkExportCollectionFileDao.saveAndFlush(theBulkExportCollectionEntity);
}
}

View File

@ -1,5 +1,6 @@
package ca.uhn.fhir.jpa.config;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.jobs.expunge.DeleteExpungeJobSubmitterImpl;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.FhirVersionEnum;
@ -16,6 +17,7 @@ import ca.uhn.fhir.jpa.binary.provider.BinaryAccessProvider;
import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportJobSchedulingHelper;
import ca.uhn.fhir.jpa.bulk.export.provider.BulkDataExportProvider;
import ca.uhn.fhir.jpa.bulk.export.svc.BulkDataExportJobSchedulingHelperImpl;
import ca.uhn.fhir.jpa.bulk.export.svc.BulkExportHelperService;
import ca.uhn.fhir.jpa.bulk.imprt.api.IBulkDataImportSvc;
import ca.uhn.fhir.jpa.bulk.imprt.svc.BulkDataImportSvcImpl;
import ca.uhn.fhir.jpa.cache.IResourceVersionSvc;
@ -161,6 +163,7 @@ import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler;
import org.springframework.scheduling.concurrent.ScheduledExecutorFactoryBean;
import org.springframework.transaction.PlatformTransactionManager;
import javax.annotation.Nullable;
import java.io.IOException;
@ -451,8 +454,8 @@ public class JpaConfig {
}
@Bean
public IBulkDataExportJobSchedulingHelper bulkDataExportJobSchedulingHelper() {
return new BulkDataExportJobSchedulingHelperImpl();
public IBulkDataExportJobSchedulingHelper bulkDataExportJobSchedulingHelper(DaoRegistry theDaoRegistry, PlatformTransactionManager theTxManager, DaoConfig theDaoConfig, BulkExportHelperService theBulkExportHelperSvc, IJobPersistence theJpaJobPersistence) {
return new BulkDataExportJobSchedulingHelperImpl(theDaoRegistry, theTxManager, theDaoConfig, theBulkExportHelperSvc, theJpaJobPersistence, null);
}
@Bean

View File

@ -143,7 +143,7 @@ import java.util.Set;
import java.util.StringTokenizer;
import java.util.stream.Collectors;
import static ca.uhn.fhir.jpa.model.util.JpaConstants.ALL_PARTITIONS_NAME;
import static java.util.Objects.nonNull;
import static org.apache.commons.lang3.StringUtils.defaultString;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
@ -601,7 +601,11 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> extends BaseStora
} else {
theEntity.setHashSha256(null);
if(nonNull(theEntity.getHashSha256())){
theEntity.setHashSha256(null);
changed = true;
}
resourceBinary = null;
resourceText = null;
encoding = ResourceEncodingEnum.DEL;
@ -939,7 +943,6 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> extends BaseStora
entity.setUpdated(theDeletedTimestampOrNull);
entity.setNarrativeText(null);
entity.setContentText(null);
entity.setHashSha256(null);
entity.setIndexStatus(INDEX_STATUS_INDEXED);
changed = populateResourceIntoEntity(theTransactionDetails, theRequest, theResource, entity, true);

View File

@ -681,6 +681,8 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
// Perform delete
preDelete(resourceToDelete, entity, theRequest);
updateEntityForDelete(theRequest, transactionDetails, entity);
resourceToDelete.setId(entity.getIdDt());

View File

@ -28,6 +28,7 @@ import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import java.util.Date;
import java.util.List;
import java.util.Set;
@ -60,6 +61,14 @@ public interface IBatch2JobInstanceRepository extends JpaRepository<Batch2JobIns
Pageable thePageable
);
@Query("SELECT b from Batch2JobInstanceEntity b WHERE b.myDefinitionId = :defId AND b.myStatus IN( :stats ) AND b.myEndTime < :cutoff")
List<Batch2JobInstanceEntity> findInstancesByJobIdAndStatusAndExpiry(
@Param("defId") String theDefinitionId,
@Param("stats") Set<StatusEnum> theStatus,
@Param("cutoff") Date theCutoff,
Pageable thePageable
);
@Query("SELECT e FROM Batch2JobInstanceEntity e WHERE e.myDefinitionId = :jobDefinitionId AND e.myStatus IN :statuses")
List<Batch2JobInstanceEntity> fetchInstancesByJobDefinitionIdAndStatus(@Param("jobDefinitionId") String theJobDefinitionId, @Param("statuses") Set<StatusEnum> theIncompleteStatuses, Pageable thePageRequest);

View File

@ -30,6 +30,7 @@ import org.springframework.data.repository.query.Param;
import java.util.Date;
import java.util.List;
import java.util.stream.Stream;
public interface IBatch2WorkChunkRepository extends JpaRepository<Batch2WorkChunkEntity, String>, IHapiFhirJpaRepository {
@ -39,9 +40,16 @@ public interface IBatch2WorkChunkRepository extends JpaRepository<Batch2WorkChun
@Query("SELECT DISTINCT e.myStatus from Batch2WorkChunkEntity e where e.myInstanceId = :instanceId AND e.myTargetStepId = :stepId")
List<StatusEnum> getDistinctStatusesForStep(@Param("instanceId") String theInstanceId, @Param("stepId") String theStepId);
/**
* Deprecated, use {@link ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository#fetchChunksForStep(String, String)}
*/
@Deprecated
@Query("SELECT e FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId AND e.myTargetStepId = :targetStepId ORDER BY e.mySequence ASC")
List<Batch2WorkChunkEntity> fetchChunksForStep(Pageable thePageRequest, @Param("instanceId") String theInstanceId, @Param("targetStepId") String theTargetStepId);
@Query("SELECT e FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId AND e.myTargetStepId = :targetStepId ORDER BY e.mySequence ASC")
Stream<Batch2WorkChunkEntity> fetchChunksForStep(@Param("instanceId") String theInstanceId, @Param("targetStepId") String theTargetStepId);
@Modifying
@Query("UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myEndTime = :et, e.myRecordsProcessed = :rp, e.mySerializedData = null WHERE e.myId = :id")
void updateChunkStatusAndClearDataForEndSuccess(@Param("id") String theChunkId, @Param("et") Date theEndTime, @Param("rp") int theRecordsProcessed, @Param("status") StatusEnum theInProgress);

View File

@ -1,39 +0,0 @@
package ca.uhn.fhir.jpa.dao.data;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionEntity;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
/*
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2023 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
@Deprecated
public interface IBulkExportCollectionDao extends JpaRepository<BulkExportCollectionEntity, Long>, IHapiFhirJpaRepository {
@Modifying
@Query("DELETE FROM BulkExportCollectionEntity t")
void deleteAllFiles();
@Modifying
@Query("DELETE FROM BulkExportCollectionEntity t WHERE t.myId = :pid")
void deleteByPid(@Param("pid") Long theId);
}

View File

@ -1,39 +0,0 @@
package ca.uhn.fhir.jpa.dao.data;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionFileEntity;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
/*
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2023 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
public interface IBulkExportCollectionFileDao extends JpaRepository<BulkExportCollectionFileEntity, Long>, IHapiFhirJpaRepository {
@Modifying
@Query("DELETE FROM BulkExportCollectionFileEntity t")
void deleteAllFiles();
@Modifying
@Query("DELETE FROM BulkExportCollectionFileEntity t WHERE t.myId = :pid")
void deleteByPid(@Param("pid") Long theId);
}

View File

@ -34,25 +34,6 @@ import java.util.Optional;
public interface IBulkExportJobDao extends JpaRepository<BulkExportJobEntity, Long>, IHapiFhirJpaRepository {
@Query("SELECT j FROM BulkExportJobEntity j WHERE j.myJobId = :jobid")
Optional<BulkExportJobEntity> findByJobId(@Param("jobid") String theUuid);
@Query("SELECT j FROM BulkExportJobEntity j WHERE j.myStatus = :status")
Slice<BulkExportJobEntity> findByStatus(Pageable thePage, @Param("status") BulkExportJobStatusEnum theSubmitted);
@Query("SELECT j FROM BulkExportJobEntity j WHERE j.myExpiry < :cutoff")
Slice<BulkExportJobEntity> findByExpiry(Pageable thePage, @Param("cutoff") Date theCutoff);
@Query("SELECT j FROM BulkExportJobEntity j WHERE j.myExpiry IS NOT NULL and j.myExpiry < :cutoff AND j.myStatus <> 'BUILDING'")
Slice<BulkExportJobEntity> findNotRunningByExpiry(Pageable thePage, @Param("cutoff") Date theCutoff);
@Query("SELECT j FROM BulkExportJobEntity j WHERE j.myRequest = :request AND j.myCreated > :createdAfter AND j.myStatus <> :status ORDER BY j.myCreated DESC")
Slice<BulkExportJobEntity> findExistingJob(Pageable thePage, @Param("request") String theRequest, @Param("createdAfter") Date theCreatedAfter, @Param("status") BulkExportJobStatusEnum theNotStatus);
@Modifying
@Query("DELETE FROM BulkExportJobEntity t")
void deleteAllFiles();
@Modifying
@Query("DELETE FROM BulkExportJobEntity t WHERE t.myId = :pid")
void deleteByPid(@Param("pid") Long theId);

View File

@ -0,0 +1,74 @@
package ca.uhn.fhir.jpa.dao.index;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2023 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
import org.hl7.fhir.instance.model.api.IBaseResource;
public class ExtractInlineReferenceParams {
private IBaseResource myResource;
private TransactionDetails myTransactionDetails;
private RequestDetails myRequestDetails;
private boolean myFailOnInvalidReferences;
public ExtractInlineReferenceParams(
IBaseResource theResource,
TransactionDetails theTransactionDetails,
RequestDetails theRequest
) {
myResource = theResource;
myTransactionDetails = theTransactionDetails;
myRequestDetails = theRequest;
}
public IBaseResource getResource() {
return myResource;
}
public void setResource(IBaseResource theResource) {
myResource = theResource;
}
public TransactionDetails getTransactionDetails() {
return myTransactionDetails;
}
public void setTransactionDetails(TransactionDetails theTransactionDetails) {
myTransactionDetails = theTransactionDetails;
}
public RequestDetails getRequestDetails() {
return myRequestDetails;
}
public void setRequestDetails(RequestDetails theRequestDetails) {
myRequestDetails = theRequestDetails;
}
public boolean isFailOnInvalidReferences() {
return myFailOnInvalidReferences;
}
public void setFailOnInvalidReferences(boolean theFailOnInvalidReferences) {
myFailOnInvalidReferences = theFailOnInvalidReferences;
}
}

View File

@ -150,6 +150,9 @@ public class IdHelperService implements IIdHelperService<JpaPid> {
assert myDontCheckActiveTransactionForUnitTest || TransactionSynchronizationManager.isSynchronizationActive();
assert theRequestPartitionId != null;
if (theResourceId.contains("/")) {
theResourceId = theResourceId.substring(theResourceId.indexOf("/") + 1);
}
IdDt id = new IdDt(theResourceType, theResourceId);
Map<String, List<IResourceLookup<JpaPid>>> matches = translateForcedIdToPids(theRequestPartitionId,
Collections.singletonList(id),

View File

@ -113,7 +113,9 @@ public class SearchParamWithInlineReferencesExtractor {
}
public void populateFromResource(RequestPartitionId theRequestPartitionId, ResourceIndexedSearchParams theParams, TransactionDetails theTransactionDetails, ResourceTable theEntity, IBaseResource theResource, ResourceIndexedSearchParams theExistingParams, RequestDetails theRequest, boolean theFailOnInvalidReference) {
extractInlineReferences(theResource, theTransactionDetails, theRequest);
ExtractInlineReferenceParams theExtractParams = new ExtractInlineReferenceParams(theResource, theTransactionDetails, theRequest);
theExtractParams.setFailOnInvalidReferences(theFailOnInvalidReference);
extractInlineReferences(theExtractParams);
mySearchParamExtractorService.extractFromResource(theRequestPartitionId, theRequest, theParams, theExistingParams, theEntity, theResource, theTransactionDetails, theFailOnInvalidReference);
@ -188,16 +190,32 @@ public class SearchParamWithInlineReferencesExtractor {
myContext = theContext;
}
@Deprecated
public void extractInlineReferences(
IBaseResource theResource,
TransactionDetails theTransactionDetails,
RequestDetails theRequest
) {
extractInlineReferences(new ExtractInlineReferenceParams(theResource, theTransactionDetails, theRequest));
}
/**
* Handle references within the resource that are match URLs, for example references like "Patient?identifier=foo". These match URLs are resolved and replaced with the ID of the
* Handle references within the resource that are match URLs, for example references like "Patient?identifier=foo".
* These match URLs are resolved and replaced with the ID of the
* matching resource.
*
* This method is *only* called from UPDATE path
*/
public void extractInlineReferences(IBaseResource theResource, TransactionDetails theTransactionDetails, RequestDetails theRequest) {
public void extractInlineReferences(ExtractInlineReferenceParams theParams) {
if (!myDaoConfig.isAllowInlineMatchUrlReferences()) {
return;
}
IBaseResource resource = theParams.getResource();
RequestDetails theRequest = theParams.getRequestDetails();
TransactionDetails theTransactionDetails = theParams.getTransactionDetails();
FhirTerser terser = myContext.newTerser();
List<IBaseReference> allRefs = terser.getAllPopulatedChildElementsOfType(theResource, IBaseReference.class);
List<IBaseReference> allRefs = terser.getAllPopulatedChildElementsOfType(resource, IBaseReference.class);
for (IBaseReference nextRef : allRefs) {
IIdType nextId = nextRef.getReferenceElement();
String nextIdText = nextId.getValue();
@ -229,7 +247,6 @@ public class SearchParamWithInlineReferencesExtractor {
JpaPid match;
if (matches.isEmpty()) {
Optional<IBasePersistedResource> placeholderOpt = myDaoResourceLinkResolver.createPlaceholderTargetIfConfiguredToDoSo(matchResourceType, nextRef, null, theRequest, theTransactionDetails);
if (placeholderOpt.isPresent()) {
match = (JpaPid) placeholderOpt.get().getPersistentId();

View File

@ -87,9 +87,30 @@ public class HapiFhirJpaMigrationTasks extends BaseMigrationTasks<VersionEnum> {
init610();
init620();
init630();
init640();
init660();
}
private void init630() {
protected void init660() {
Builder version = forVersion(VersionEnum.V6_6_0);
// fix Postgres clob types - that stupid oid driver problem is still there
// BT2_JOB_INSTANCE.PARAMS_JSON_LOB
version.onTable("BT2_JOB_INSTANCE")
.migratePostgresTextClobToBinaryClob("20230208.1", "PARAMS_JSON_LOB");
// BT2_JOB_INSTANCE.REPORT
version.onTable("BT2_JOB_INSTANCE")
.migratePostgresTextClobToBinaryClob("20230208.2", "REPORT");
// BT2_WORK_CHUNK.CHUNK_DATA
version.onTable("BT2_WORK_CHUNK")
.migratePostgresTextClobToBinaryClob("20230208.3", "CHUNK_DATA");
}
protected void init640() {
}
protected void init630() {
Builder version = forVersion(VersionEnum.V6_3_0);
// start forced_id inline migration
@ -108,17 +129,7 @@ public class HapiFhirJpaMigrationTasks extends BaseMigrationTasks<VersionEnum> {
.online(true)
.withColumns("SEARCH_PID")
.onlyAppliesToPlatforms(NON_AUTOMATIC_FK_INDEX_PLATFORMS);
// fix Postgres clob types - that stupid oid driver problem is still there
// BT2_JOB_INSTANCE.PARAMS_JSON_LOB
version.onTable("BT2_JOB_INSTANCE")
.migratePostgresTextClobToBinaryClob("20230208.1", "PARAMS_JSON_LOB");
// BT2_JOB_INSTANCE.REPORT
version.onTable("BT2_JOB_INSTANCE")
.migratePostgresTextClobToBinaryClob("20230208.2", "REPORT");
// BT2_WORK_CHUNK.CHUNK_DATA
version.onTable("BT2_WORK_CHUNK")
.migratePostgresTextClobToBinaryClob("20230208.3", "CHUNK_DATA");
;
}

View File

@ -51,6 +51,7 @@ import javax.annotation.Nullable;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;
import static org.apache.commons.collections4.CollectionUtils.isNotEmpty;
public class Batch2DaoSvcImpl implements IBatch2DaoSvc {
@ -99,8 +100,9 @@ public class Batch2DaoSvcImpl implements IBatch2DaoSvc {
List<IResourcePersistentId> ids = dao.searchForIds(searchParamMap, request);
Date lastDate = null;
if (ids.size() > 0) {
lastDate = dao.readByPid(ids.get(ids.size() - 1)).getMeta().getLastUpdated();
if (isNotEmpty(ids)) {
IResourcePersistentId lastResourcePersistentId = ids.get(ids.size() - 1);
lastDate = dao.readByPid(lastResourcePersistentId, true).getMeta().getLastUpdated();
}
return new HomogeneousResourcePidList(resourceType, ids, lastDate);

View File

@ -0,0 +1,382 @@
package ca.uhn.fhir.jpa.bulk.export.svc;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.model.BulkExportJobResults;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.util.Batch2JobDefinitionConstants;
import ca.uhn.fhir.util.JsonUtil;
import org.apache.commons.lang3.time.DateUtils;
import org.hl7.fhir.instance.model.api.IBaseBinary;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.Binary;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.stubbing.Answer;
import org.mockito.stubbing.OngoingStubbing;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.util.Pair;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionTemplate;
import javax.annotation.Nonnull;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.IntStream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class BulkDataExportJobSchedulingHelperImplTest {
@Mock
private DaoConfig myDaoConfig;
@Mock
private PlatformTransactionManager myTxManager;
@Mock
private TransactionTemplate myTxTemplate;
@Mock
private IJobPersistence myJpaJobPersistence;
@Mock
private BulkExportHelperService myBulkExportHelperSvc;
@Mock
private DaoRegistry myDaoRegistry;
@Mock
private IFhirResourceDao<IBaseBinary> myBinaryDao;
@Captor
private ArgumentCaptor<Date> myCutoffCaptor;
private BulkDataExportJobSchedulingHelperImpl myBulkDataExportJobSchedulingHelper;
private final FhirContext myFhirContext = FhirContext.forR4Cached();
@Test
public void testPurgeExpiredFilesDisabledDoesNothing() {
setupTestDisabled();
myBulkDataExportJobSchedulingHelper.purgeExpiredFiles();
verify(myJpaJobPersistence, never()).fetchInstance(anyString());
verify(myBulkExportHelperSvc, never()).toId(anyString());
verify(myJpaJobPersistence, never()).deleteInstanceAndChunks(anyString());
}
@Test
public void purgeExpiredFilesNothingToDeleteOneHourRetention() {
final int expectedRetentionHours = 1;
setupTestEnabled(expectedRetentionHours, List.of());
myBulkDataExportJobSchedulingHelper.purgeExpiredFiles();
verify(myJpaJobPersistence, never()).fetchInstance(anyString());
verify(myBulkExportHelperSvc, never()).toId(anyString());
verify(myBinaryDao, never()).delete(any(IIdType.class), any(SystemRequestDetails.class));
verify(myJpaJobPersistence, never()).deleteInstanceAndChunks(anyString());
final Date cutoffDate = myCutoffCaptor.getValue();
assertEquals(DateUtils.truncate(computeDateFromConfig(expectedRetentionHours), Calendar.SECOND), DateUtils.truncate(cutoffDate, Calendar.SECOND));
}
@Test
public void purgeExpiredFilesSingleJobSingleBinaryOneHourRetention_NULL_reportString() {
final int expectedRetentionHours = 1;
final int numBinariesPerJob = 1;
final List<JobInstance> jobInstances = getJobInstances(numBinariesPerJob, StatusEnum.COMPLETED);
jobInstances.get(0).setReport(null);
setupTestEnabledNoBinaries(expectedRetentionHours, jobInstances);
myBulkDataExportJobSchedulingHelper.purgeExpiredFiles();
for (JobInstance jobInstance : jobInstances) {
verify(myJpaJobPersistence).fetchInstance(jobInstance.getInstanceId());
verify(myBulkExportHelperSvc, never()).toId(anyString());
verify(myBinaryDao, never()).delete(any(IIdType.class), any(SystemRequestDetails.class));
verify(myJpaJobPersistence).deleteInstanceAndChunks(jobInstance.getInstanceId());
}
final Date cutoffDate = myCutoffCaptor.getValue();
assertEquals(DateUtils.truncate(computeDateFromConfig(expectedRetentionHours), Calendar.MINUTE), DateUtils.truncate(cutoffDate, Calendar.MINUTE));
}
@Test
public void purgeExpiredFilesSingleJobSingleBinaryOneHourRetention_BAD_reportString() {
final int expectedRetentionHours = 1;
final int numBinariesPerJob = 1;
final List<JobInstance> jobInstances = getJobInstances(numBinariesPerJob, StatusEnum.COMPLETED);
jobInstances.get(0).setReport("{garbage}");
setupTestEnabledNoBinaries(expectedRetentionHours, jobInstances);
myBulkDataExportJobSchedulingHelper.purgeExpiredFiles();
for (JobInstance jobInstance : jobInstances) {
verify(myJpaJobPersistence).fetchInstance(jobInstance.getInstanceId());
verify(myBulkExportHelperSvc, never()).toId(anyString());
verify(myBinaryDao, never()).delete(any(IIdType.class), any(SystemRequestDetails.class));
verify(myJpaJobPersistence).deleteInstanceAndChunks(jobInstance.getInstanceId());
}
final Date cutoffDate = myCutoffCaptor.getValue();
assertEquals(DateUtils.truncate(computeDateFromConfig(expectedRetentionHours), Calendar.MINUTE), DateUtils.truncate(cutoffDate, Calendar.MINUTE));
}
@Test
public void purgeExpiredFilesSingleJobSingleBinaryOneHourRetention() {
final int expectedRetentionHours = 1;
final int numBinariesPerJob = 1;
final List<JobInstance> jobInstances = getJobInstances(numBinariesPerJob, StatusEnum.COMPLETED);
setupTestEnabled(expectedRetentionHours, jobInstances);
myBulkDataExportJobSchedulingHelper.purgeExpiredFiles();
for (JobInstance jobInstance : jobInstances) {
verify(myJpaJobPersistence).fetchInstance(jobInstance.getInstanceId());
for (int index = 0; index < numBinariesPerJob; index++) {
verify(myBulkExportHelperSvc).toId(jobInstance.getInstanceId() + "-binary-" + index);
verify(myBinaryDao).delete(eq(toId(jobInstance.getInstanceId() + "-binary-" + index)), any(SystemRequestDetails.class));
}
verify(myJpaJobPersistence).deleteInstanceAndChunks(jobInstance.getInstanceId());
}
final Date cutoffDate = myCutoffCaptor.getValue();
assertEquals(DateUtils.truncate(computeDateFromConfig(expectedRetentionHours), Calendar.MINUTE), DateUtils.truncate(cutoffDate, Calendar.MINUTE));
}
@Test
public void purgeExpiredFilesSingleJobSingleBinaryOneHourRetentionStatusFailed() {
final int expectedRetentionHours = 1;
final int numBinariesPerJob = 1;
final List<JobInstance> jobInstances = getJobInstances(numBinariesPerJob, StatusEnum.COMPLETED);
setupTestEnabled(expectedRetentionHours, jobInstances);
myBulkDataExportJobSchedulingHelper.purgeExpiredFiles();
for (JobInstance jobInstance : jobInstances) {
verify(myJpaJobPersistence).fetchInstance(jobInstance.getInstanceId());
for (int index = 0; index < numBinariesPerJob; index++) {
verify(myBulkExportHelperSvc).toId(jobInstance.getInstanceId() + "-binary-" + index);
verify(myBinaryDao).delete(eq(toId(jobInstance.getInstanceId() + "-binary-" + index)), any(SystemRequestDetails.class));
}
verify(myJpaJobPersistence).deleteInstanceAndChunks(jobInstance.getInstanceId());
}
final Date cutoffDate = myCutoffCaptor.getValue();
assertEquals(DateUtils.truncate(computeDateFromConfig(expectedRetentionHours), Calendar.MINUTE), DateUtils.truncate(cutoffDate, Calendar.MINUTE));
}
@Test
public void purgeExpiredFilesSingleJobSingleBinaryTwoHourRetention() {
final int expectedRetentionHours = 2;
final int numBinariesPerJob = 1;
final List<JobInstance> jobInstances = getJobInstances(numBinariesPerJob, StatusEnum.COMPLETED);
setupTestEnabled(expectedRetentionHours, jobInstances);
myBulkDataExportJobSchedulingHelper.purgeExpiredFiles();
for (JobInstance jobInstance : jobInstances) {
verify(myJpaJobPersistence).fetchInstance(jobInstance.getInstanceId());
for (int index = 0; index < numBinariesPerJob; index++) {
verify(myBulkExportHelperSvc).toId(jobInstance.getInstanceId() + "-binary-" + index);
verify(myBinaryDao).delete(eq(toId(jobInstance.getInstanceId() + "-binary-" + index)), any(SystemRequestDetails.class));
}
verify(myJpaJobPersistence).deleteInstanceAndChunks(jobInstance.getInstanceId());
}
final Date cutoffDate = myCutoffCaptor.getValue();
assertEquals(DateUtils.truncate(computeDateFromConfig(expectedRetentionHours), Calendar.MINUTE), DateUtils.truncate(cutoffDate, Calendar.MINUTE));
}
@Test
public void purgeExpiredFilesMultipleJobsMultipleBinariesTwoHourRetention() {
final int expectedRetentionHours = 2;
final int numBinariesPerJob = 3;
final List<JobInstance> jobInstances = getJobInstances( numBinariesPerJob, StatusEnum.COMPLETED, StatusEnum.COMPLETED, StatusEnum.COMPLETED);
setupTestEnabled(expectedRetentionHours, jobInstances);
myBulkDataExportJobSchedulingHelper.purgeExpiredFiles();
for (JobInstance jobInstance : jobInstances) {
verify(myJpaJobPersistence).fetchInstance(jobInstance.getInstanceId());
for (int index = 0; index < numBinariesPerJob; index++) {
verify(myBulkExportHelperSvc).toId(jobInstance.getInstanceId() + "-binary-" + index);
verify(myBinaryDao).delete(eq(toId(jobInstance.getInstanceId() + "-binary-" + index)), any(SystemRequestDetails.class));
}
verify(myJpaJobPersistence).deleteInstanceAndChunks(jobInstance.getInstanceId());
}
final Date cutoffDate = myCutoffCaptor.getValue();
assertEquals(DateUtils.truncate(computeDateFromConfig(expectedRetentionHours), Calendar.MINUTE), DateUtils.truncate(cutoffDate, Calendar.MINUTE));
}
@Test
public void purgeExpiredFilesMultipleJobsMultipleBinariesTwoHourRetentionMixedStatuses() {
final int expectedRetentionHours = 2;
final int numBinariesPerJob = 3;
final List<JobInstance> jobInstances = getJobInstances( numBinariesPerJob, StatusEnum.COMPLETED, StatusEnum.FAILED, StatusEnum.COMPLETED);
setupTestEnabled(expectedRetentionHours, jobInstances);
myBulkDataExportJobSchedulingHelper.purgeExpiredFiles();
for (JobInstance jobInstance : jobInstances) {
verify(myJpaJobPersistence).fetchInstance(jobInstance.getInstanceId());
if (StatusEnum.FAILED != jobInstance.getStatus()) {
for (int index = 0; index < numBinariesPerJob; index++) {
verify(myBulkExportHelperSvc).toId(jobInstance.getInstanceId() + "-binary-" + index);
verify(myBinaryDao).delete(eq(toId(jobInstance.getInstanceId() + "-binary-" + index)), any(SystemRequestDetails.class));
}
verify(myJpaJobPersistence).deleteInstanceAndChunks(jobInstance.getInstanceId());
}
}
final Date cutoffDate = myCutoffCaptor.getValue();
assertEquals(DateUtils.truncate(computeDateFromConfig(expectedRetentionHours), Calendar.MINUTE), DateUtils.truncate(cutoffDate, Calendar.MINUTE));
}
@Nonnull
private static List<JobInstance> getJobInstances(int theNumBinaries, StatusEnum... theStatusEnums) {
return IntStream.range(0, theStatusEnums.length)
.mapToObj(index -> Pair.of(index, theStatusEnums[index]))
.map(pair -> {
final JobInstance jobInstance = new JobInstance();
final StatusEnum status = pair.getSecond();
final String instanceId = status.name() + pair.getFirst();
jobInstance.setInstanceId(instanceId);
jobInstance.setReport(serialize(getBulkExportJobResults(instanceId, theNumBinaries)));
jobInstance.setStatus(status);
return jobInstance;
}).toList();
}
private static String serialize(BulkExportJobResults theBulkExportJobResults) {
return JsonUtil.serialize(theBulkExportJobResults);
}
@Nonnull
private static BulkExportJobResults getBulkExportJobResults(String theInstanceId, int theNumBinaries) {
final BulkExportJobResults bulkExportJobResults = new BulkExportJobResults();
bulkExportJobResults.setResourceTypeToBinaryIds(Map.of("Patient",
IntStream.range(0, theNumBinaries)
.mapToObj(theInt -> theInstanceId + "-binary-" + theInt)
.toList()));
return bulkExportJobResults;
}
@Nonnull
private Date computeDateFromConfig(int theExpectedRetentionHours) {
return Date.from(LocalDateTime.now()
.minusHours(theExpectedRetentionHours)
.atZone(ZoneId.systemDefault())
.toInstant());
}
private void setupTestDisabled() {
setupTest(false, -1, List.of(), false);
}
private void setupTestEnabled(int theRetentionHours, List<JobInstance> theJobInstances) {
setupTest(true, theRetentionHours, theJobInstances, true);
}
private void setupTestEnabledNoBinaries(int theRetentionHours, List<JobInstance> theJobInstances) {
setupTest(true, theRetentionHours, theJobInstances, false);
}
private void setupTest(boolean theIsEnabled, int theRetentionHours, List<JobInstance> theJobInstances, boolean theIsEnableBinaryMocks) {
myBulkDataExportJobSchedulingHelper = new BulkDataExportJobSchedulingHelperImpl(myDaoRegistry, myTxManager, myDaoConfig, myBulkExportHelperSvc, myJpaJobPersistence, myTxTemplate);
when(myDaoConfig.isEnableTaskBulkExportJobExecution()).thenReturn(theIsEnabled);
if (!theIsEnabled) {
return;
}
final Answer<List<JobInstance>> fetchInstancesAnswer = theInvocationOnMock -> {
final TransactionCallback<List<JobInstance>> transactionCallback = theInvocationOnMock.getArgument(0);
return transactionCallback.doInTransaction(null);
};
final Answer<Void> purgeExpiredJobsAnswer = theInvocationOnMock -> {
final TransactionCallback<Optional<JobInstance>> transactionCallback = theInvocationOnMock.getArgument(0);
transactionCallback.doInTransaction(null);
return null;
};
when(myJpaJobPersistence.fetchInstances(eq(Batch2JobDefinitionConstants.BULK_EXPORT),
eq(StatusEnum.getEndedStatuses()),
myCutoffCaptor.capture(),
any(PageRequest.class)))
.thenReturn(theJobInstances);
when(myTxTemplate.execute(any()))
.thenAnswer(fetchInstancesAnswer).thenAnswer(purgeExpiredJobsAnswer);
when(myDaoConfig.getBulkExportFileRetentionPeriodHours())
.thenReturn(theRetentionHours);
if (theJobInstances.isEmpty()) {
return;
}
OngoingStubbing<Optional<JobInstance>> when = when(myJpaJobPersistence.fetchInstance(anyString()));
for (JobInstance jobInstance : theJobInstances) {
when = when.thenReturn(Optional.of(jobInstance));
}
if (!theIsEnableBinaryMocks) {
return;
}
when(myBulkExportHelperSvc.toId(anyString()))
.thenAnswer(theInvocationOnMock -> toId(theInvocationOnMock.getArgument(0)));
when(myDaoRegistry.getResourceDao(Binary.class.getSimpleName())).thenReturn(myBinaryDao);
}
private IIdType toId(String theResourceId) {
final IIdType retVal = myFhirContext.getVersion().newIdType();
retVal.setValue(theResourceId);
return retVal;
}
}

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.5.0-SNAPSHOT</version>
<version>6.5.1-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -3,7 +3,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.5.0-SNAPSHOT</version>
<version>6.5.1-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.5.0-SNAPSHOT</version>
<version>6.5.1-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -73,11 +73,9 @@ public class MdmLinkDaoSvc<P extends IResourcePersistentId, M extends IMdmLink<P
mdmLink.setEidMatch(theMatchOutcome.isEidMatch() | mdmLink.isEidMatchPresent());
mdmLink.setHadToCreateNewGoldenResource(theMatchOutcome.isCreatedNewResource() | mdmLink.getHadToCreateNewGoldenResource());
mdmLink.setMdmSourceType(myFhirContext.getResourceType(theSourceResource));
if (mdmLink.getScore() != null) {
mdmLink.setScore(Math.max(theMatchOutcome.score, mdmLink.getScore()));
} else {
mdmLink.setScore(theMatchOutcome.score);
}
setScoreProperties(theMatchOutcome, mdmLink);
// Add partition for the mdm link if it's available in the source resource
RequestPartitionId partitionId = (RequestPartitionId) theSourceResource.getUserData(Constants.RESOURCE_PARTITION_ID);
if (partitionId != null && partitionId.getFirstPartitionIdOrNull() != null) {
@ -91,6 +89,24 @@ public class MdmLinkDaoSvc<P extends IResourcePersistentId, M extends IMdmLink<P
return mdmLink;
}
private void setScoreProperties(MdmMatchOutcome theMatchOutcome, M mdmLink) {
if (theMatchOutcome.getScore() != null) {
mdmLink.setScore( mdmLink.getScore() != null
? Math.max(theMatchOutcome.getNormalizedScore(), mdmLink.getScore())
: theMatchOutcome.getNormalizedScore() );
}
if (theMatchOutcome.getVector() != null) {
mdmLink.setVector( mdmLink.getVector() != null
? Math.max(theMatchOutcome.getVector(), mdmLink.getVector())
: theMatchOutcome.getVector() );
}
mdmLink.setRuleCount( mdmLink.getRuleCount() != null
? Math.max(theMatchOutcome.getMdmRuleCount(), mdmLink.getRuleCount())
: theMatchOutcome.getMdmRuleCount() );
}
@Nonnull
public M getOrCreateMdmLinkByGoldenResourceAndSourceResource(
IAnyResource theGoldenResource, IAnyResource theSourceResource
@ -127,7 +143,6 @@ public class MdmLinkDaoSvc<P extends IResourcePersistentId, M extends IMdmLink<P
* @param theSourceResourcePid The ResourcepersistenceId of the Source resource
* @return The {@link IMdmLink} entity that matches these criteria if exists
*/
@SuppressWarnings("unchecked")
public Optional<M> getLinkByGoldenResourcePidAndSourceResourcePid(P theGoldenResourcePid, P theSourceResourcePid) {
if (theSourceResourcePid == null || theGoldenResourcePid == null) {
return Optional.empty();

View File

@ -94,7 +94,7 @@ public class MdmMatchLinkSvc {
private void handleMdmWithMultipleCandidates(IAnyResource theResource, CandidateList theCandidateList, MdmTransactionContext theMdmTransactionContext) {
MatchedGoldenResourceCandidate firstMatch = theCandidateList.getFirstMatch();
IResourcePersistentId sampleGoldenResourcePid = firstMatch.getCandidateGoldenResourcePid();
IResourcePersistentId<?> sampleGoldenResourcePid = firstMatch.getCandidateGoldenResourcePid();
boolean allSameGoldenResource = theCandidateList.stream()
.allMatch(candidate -> candidate.getCandidateGoldenResourcePid().equals(sampleGoldenResourcePid));
@ -105,17 +105,7 @@ public class MdmMatchLinkSvc {
log(theMdmTransactionContext, "MDM received multiple match candidates, that were linked to different Golden Resources. Setting POSSIBLE_DUPLICATES and POSSIBLE_MATCHES.");
//Set them all as POSSIBLE_MATCH
List<IAnyResource> goldenResources = new ArrayList<>();
for (MatchedGoldenResourceCandidate matchedGoldenResourceCandidate : theCandidateList.getCandidates()) {
IAnyResource goldenResource = myMdmGoldenResourceFindingSvc
.getGoldenResourceFromMatchedGoldenResourceCandidate(matchedGoldenResourceCandidate, theMdmTransactionContext.getResourceType());
MdmMatchOutcome outcome = new MdmMatchOutcome(matchedGoldenResourceCandidate.getMatchResult().vector,
matchedGoldenResourceCandidate.getMatchResult().getNormalizedScore());
outcome.setMatchResultEnum(MdmMatchResultEnum.POSSIBLE_MATCH);
outcome.setEidMatch(theCandidateList.isEidMatch());
myMdmLinkSvc.updateLink(goldenResource, theResource, outcome, MdmLinkSourceEnum.AUTO, theMdmTransactionContext);
goldenResources.add(goldenResource);
}
List<IAnyResource> goldenResources = createPossibleMatches(theResource, theCandidateList, theMdmTransactionContext);
//Set all GoldenResources as POSSIBLE_DUPLICATE of the last GoldenResource.
IAnyResource firstGoldenResource = goldenResources.get(0);
@ -129,6 +119,26 @@ public class MdmMatchLinkSvc {
}
}
private List<IAnyResource> createPossibleMatches(IAnyResource theResource, CandidateList theCandidateList, MdmTransactionContext theMdmTransactionContext) {
List<IAnyResource> goldenResources = new ArrayList<>();
for (MatchedGoldenResourceCandidate matchedGoldenResourceCandidate : theCandidateList.getCandidates()) {
IAnyResource goldenResource = myMdmGoldenResourceFindingSvc
.getGoldenResourceFromMatchedGoldenResourceCandidate(matchedGoldenResourceCandidate, theMdmTransactionContext.getResourceType());
MdmMatchOutcome outcome = new MdmMatchOutcome(matchedGoldenResourceCandidate.getMatchResult().getVector(),
matchedGoldenResourceCandidate.getMatchResult().getScore())
.setMdmRuleCount( matchedGoldenResourceCandidate.getMatchResult().getMdmRuleCount());
outcome.setMatchResultEnum(MdmMatchResultEnum.POSSIBLE_MATCH);
outcome.setEidMatch(theCandidateList.isEidMatch());
myMdmLinkSvc.updateLink(goldenResource, theResource, outcome, MdmLinkSourceEnum.AUTO, theMdmTransactionContext);
goldenResources.add(goldenResource);
}
return goldenResources;
}
private void handleMdmWithNoCandidates(IAnyResource theResource, MdmTransactionContext theMdmTransactionContext) {
log(theMdmTransactionContext, String.format("There were no matched candidates for MDM, creating a new %s Golden Resource.", theResource.getIdElement().getResourceType()));
IAnyResource newGoldenResource = myGoldenResourceHelper.createGoldenResourceFromMdmSourceResource(theResource, theMdmTransactionContext);

View File

@ -569,6 +569,10 @@ abstract public class BaseMdmR4Test extends BaseJpaR4Test {
assertFields(MdmLink::getEidMatch, theExpectedValues);
}
protected void assertLinksMatchScore(Double... theExpectedValues) {
assertFields(MdmLink::getScore, theExpectedValues);
}
public SearchParameterMap buildGoldenResourceSearchParameterMap() {
SearchParameterMap spMap = new SearchParameterMap();
spMap.setLoadSynchronous(true);

View File

@ -100,7 +100,6 @@ public class MdmLinkDaoSvcTest extends BaseMdmR4Test {
mdmLink.setUpdated(new Date());
mdmLink.setGoldenResourcePersistenceId(JpaPid.fromId(thePatientPid));
mdmLink.setSourcePersistenceId(runInTransaction(()->myIdHelperService.getPidOrNull(RequestPartitionId.allPartitions(), patient)));
MdmLink saved= myMdmLinkDao.save(mdmLink);
return saved;
return myMdmLinkDao.save(mdmLink);
}
}

View File

@ -10,6 +10,7 @@ import ca.uhn.fhir.mdm.api.MdmMatchResultEnum;
import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
import ca.uhn.fhir.test.utilities.RangeTestHelper;
import ca.uhn.fhir.util.ParametersUtil;
import ca.uhn.fhir.util.StopWatch;
import org.apache.commons.lang3.StringUtils;
@ -76,9 +77,12 @@ public class MdmProviderQueryLinkR4Test extends BaseLinkR4Test {
JpaPid sourcePatient2Pid = runInTransaction(()->myIdHelperService.getPidOrNull(RequestPartitionId.allPartitions(), sourcePatient2));
MdmLink possibleDuplicateMdmLink = (MdmLink) myMdmLinkDaoSvc.newMdmLink();
possibleDuplicateMdmLink.setGoldenResourcePersistenceId(sourcePatient1Pid);
possibleDuplicateMdmLink.setSourcePersistenceId(sourcePatient2Pid);
possibleDuplicateMdmLink.setMatchResult(MdmMatchResultEnum.POSSIBLE_DUPLICATE).setLinkSource(MdmLinkSourceEnum.AUTO);
possibleDuplicateMdmLink.setGoldenResourcePersistenceId(sourcePatient1Pid)
.setSourcePersistenceId(sourcePatient2Pid)
.setMatchResult(MdmMatchResultEnum.POSSIBLE_DUPLICATE)
.setLinkSource(MdmLinkSourceEnum.AUTO)
.setScore(1.0)
.setRuleCount(1L);
saveLink(possibleDuplicateMdmLink);
}
@ -89,7 +93,7 @@ public class MdmProviderQueryLinkR4Test extends BaseLinkR4Test {
List<Parameters.ParametersParameterComponent> list = getParametersByName(result, "link");
assertThat(list, hasSize(1));
List<Parameters.ParametersParameterComponent> part = list.get(0).getPart();
assertMdmLink(MDM_LINK_PROPERTY_COUNT, part, mySourcePatientId.getValue(), myPatientId.getValue(), MdmMatchResultEnum.POSSIBLE_MATCH, "false", "true", null);
assertMdmLink(MDM_LINK_PROPERTY_COUNT, part, mySourcePatientId.getValue(), myPatientId.getValue(), MdmMatchResultEnum.POSSIBLE_MATCH, "false", "true", "1");
}
@Test
@ -99,7 +103,7 @@ public class MdmProviderQueryLinkR4Test extends BaseLinkR4Test {
List<Parameters.ParametersParameterComponent> list = getParametersByName(result, "link");
assertThat("All resources with Patient type found", list, hasSize(3));
List<Parameters.ParametersParameterComponent> part = list.get(0).getPart();
assertMdmLink(MDM_LINK_PROPERTY_COUNT, part, mySourcePatientId.getValue(), myPatientId.getValue(), MdmMatchResultEnum.POSSIBLE_MATCH, "false", "true", null);
assertMdmLink(MDM_LINK_PROPERTY_COUNT, part, mySourcePatientId.getValue(), myPatientId.getValue(), MdmMatchResultEnum.POSSIBLE_MATCH, "false", "true", "1");
}
@ -377,7 +381,7 @@ public class MdmProviderQueryLinkR4Test extends BaseLinkR4Test {
List<Parameters.ParametersParameterComponent> list = getParametersByName(result, "link");
assertThat(list, hasSize(4));
List<Parameters.ParametersParameterComponent> part = list.get(3).getPart();
assertMdmLink(MDM_LINK_PROPERTY_COUNT, part, goldenResourceId.getValue(), patientId.getValue(), MdmMatchResultEnum.MATCH, "false", "false", "2");
assertMdmLink(MDM_LINK_PROPERTY_COUNT, part, goldenResourceId.getValue(), patientId.getValue(), MdmMatchResultEnum.MATCH, "false", "false", ".666");
}
@Test
@ -459,7 +463,7 @@ public class MdmProviderQueryLinkR4Test extends BaseLinkR4Test {
assertThat(thePart.get(5).getValue().primitiveValue(), is(theNewGoldenResource));
assertThat(thePart.get(6).getName(), is("score"));
assertThat(thePart.get(6).getValue().primitiveValue(), is(theScore));
RangeTestHelper.checkInRange(theScore, thePart.get(6).getValue().primitiveValue());
}
}

View File

@ -16,6 +16,7 @@ import java.util.List;
import static ca.uhn.fhir.mdm.api.MdmMatchResultEnum.MATCH;
import static ca.uhn.fhir.mdm.api.MdmMatchResultEnum.NO_MATCH;
import static ca.uhn.fhir.mdm.api.MdmMatchResultEnum.POSSIBLE_MATCH;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
@ -52,6 +53,23 @@ class MdmLinkUpdaterSvcImplTest extends BaseMdmR4Test {
assertLinksMatchedByEid(false, false);
}
@Test
public void testUpdateLinkPossibleMatchSavesNormalizedScore() {
final Patient goldenPatient = createGoldenPatient(buildJanePatient());
final Patient patient1 = createPatient(buildJanePatient());
buildUpdateLinkMdmTransactionContext();
MdmMatchOutcome matchOutcome = new MdmMatchOutcome(61L, 5.0).setMdmRuleCount(6).setMatchResultEnum(POSSIBLE_MATCH);
myMdmLinkDaoSvc.createOrUpdateLinkEntity(goldenPatient, patient1, matchOutcome, MdmLinkSourceEnum.MANUAL, createContextForCreate("Patient"));
final List<MdmLink> targets = myMdmLinkDaoSvc.findMdmLinksByGoldenResource(goldenPatient);
assertFalse(targets.isEmpty());
assertEquals(1, targets.size());
final MdmLink mdmLink = targets.get(0);
assertEquals(matchOutcome.getNormalizedScore(), mdmLink.getScore());
}
@Test
public void testUpdateLinkMatchAfterVersionChange() {
myMdmSettings.getMdmRules().setVersion("1");

View File

@ -68,6 +68,7 @@ public class MdmMatchLinkSvcTest extends BaseMdmR4Test {
assertLinksMatchResult(MATCH);
assertLinksCreatedNewResource(true);
assertLinksMatchedByEid(false);
assertLinksMatchScore(1.0);
}
@Test
@ -79,6 +80,7 @@ public class MdmMatchLinkSvcTest extends BaseMdmR4Test {
assertLinksMatchResult(MATCH);
assertLinksCreatedNewResource(true);
assertLinksMatchedByEid(false);
assertLinksMatchScore(1.0);
}
@Test
@ -93,6 +95,7 @@ public class MdmMatchLinkSvcTest extends BaseMdmR4Test {
assertLinksMatchResult(MATCH, MATCH);
assertLinksCreatedNewResource(true, true);
assertLinksMatchedByEid(false, false);
assertLinksMatchScore(1.0, 1.0);
}
@Test
@ -107,6 +110,7 @@ public class MdmMatchLinkSvcTest extends BaseMdmR4Test {
assertLinksMatchResult(MATCH, MATCH);
assertLinksCreatedNewResource(true, false);
assertLinksMatchedByEid(false, false);
assertLinksMatchScore(1.0, 2.0/3.0);
}
@Test

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.5.0-SNAPSHOT</version>
<version>6.5.1-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.5.0-SNAPSHOT</version>
<version>6.5.1-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -1818,10 +1818,11 @@ public abstract class BaseSearchParamExtractor implements ISearchParamExtractor
nextId = valueRef.getResource().getIdElement();
}
if (nextId == null ||
nextId.isEmpty() ||
nextId.getValue().startsWith("urn:")) {
// Ignore placeholder references
if (
nextId == null ||
nextId.isEmpty()
) {
// Ignore placeholder references that are blank
} else if (!theWantLocalReferences && nextId.getValue().startsWith("#")) {
// Ignore local refs unless we specifically want them
} else {

View File

@ -105,8 +105,10 @@ public class SearchParamExtractorService {
}
/**
* This method is responsible for scanning a resource for all of the search parameter instances. I.e. for all search parameters defined for
* a given resource type, it extracts the associated indexes and populates {@literal theParams}.
* This method is responsible for scanning a resource for all of the search parameter instances.
* I.e. for all search parameters defined for
* a given resource type, it extracts the associated indexes and populates
* {@literal theParams}.
*/
public void extractFromResource(RequestPartitionId theRequestPartitionId, RequestDetails theRequestDetails, ResourceIndexedSearchParams theNewParams, ResourceIndexedSearchParams theExistingParams, ResourceTable theEntity, IBaseResource theResource, TransactionDetails theTransactionDetails, boolean theFailOnInvalidReference) {

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.5.0-SNAPSHOT</version>
<version>6.5.1-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -1,6 +1,8 @@
package ca.uhn.fhir.jpa.subscription.match.matcher.subscriber;
import ca.uhn.fhir.IHapiBootOrder;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory;
import org.slf4j.Logger;
@ -45,12 +47,14 @@ public class MatchingQueueSubscriberLoader {
private SubscriptionRegisteringSubscriber mySubscriptionRegisteringSubscriber;
@Autowired
private SubscriptionActivatingSubscriber mySubscriptionActivatingSubscriber;
@Autowired
private DaoConfig myDaoConfig;
@EventListener(ContextRefreshedEvent.class)
@Order(IHapiBootOrder.SUBSCRIPTION_MATCHING_CHANNEL_HANDLER)
public void subscribeToMatchingChannel() {
if (myMatchingChannel == null) {
myMatchingChannel = mySubscriptionChannelFactory.newMatchingReceivingChannel(SUBSCRIPTION_MATCHING_CHANNEL_NAME, null);
myMatchingChannel = mySubscriptionChannelFactory.newMatchingReceivingChannel(SUBSCRIPTION_MATCHING_CHANNEL_NAME, getChannelConsumerSettings());
}
if (myMatchingChannel != null) {
myMatchingChannel.subscribe(mySubscriptionMatchingSubscriber);
@ -60,6 +64,12 @@ public class MatchingQueueSubscriberLoader {
}
}
private ChannelConsumerSettings getChannelConsumerSettings() {
ChannelConsumerSettings channelConsumerSettings = new ChannelConsumerSettings();
channelConsumerSettings.setQualifyChannelName(myDaoConfig.isQualifySubscriptionMatchingChannelName());
return channelConsumerSettings;
}
@SuppressWarnings("unused")
@PreDestroy
public void stop() throws Exception {

View File

@ -32,6 +32,7 @@ import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
import ca.uhn.fhir.util.SubscriptionUtil;
import org.hl7.fhir.dstu2.model.Subscription;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -144,4 +145,9 @@ public class SubscriptionActivatingSubscriber extends BaseSubscriberForSubscript
}
}
public boolean isChannelTypeSupported(IBaseResource theSubscription) {
Subscription.SubscriptionChannelType channelType = mySubscriptionCanonicalizer.getChannelType(theSubscription).toCanonical();
return myDaoConfig.getSupportedSubscriptionTypes().contains(channelType);
}
}

View File

@ -228,14 +228,24 @@ public class SubscriptionLoader implements IResourceChangeListener {
* @return true if activated
*/
private boolean activateSubscriptionIfRequested(IBaseResource theSubscription) {
boolean successfullyActivated = false;
if (SubscriptionConstants.REQUESTED_STATUS.equals(mySubscriptionCanonicalizer.getSubscriptionStatus(theSubscription))) {
// internally, subscriptions that cannot activate will be set to error
if (mySubscriptionActivatingInterceptor.activateSubscriptionIfRequired(theSubscription)) {
return true;
if (mySubscriptionActivatingInterceptor.isChannelTypeSupported(theSubscription)) {
// internally, subscriptions that cannot activate will be set to error
if (mySubscriptionActivatingInterceptor.activateSubscriptionIfRequired(theSubscription)) {
successfullyActivated = true;
} else {
logSubscriptionNotActivatedPlusErrorIfPossible(theSubscription);
}
} else {
ourLog.debug("Could not activate subscription {} because channel type {} is not supported.",
theSubscription.getIdElement(),
mySubscriptionCanonicalizer.getChannelType(theSubscription));
}
logSubscriptionNotActivatedPlusErrorIfPossible(theSubscription);
}
return false;
return successfullyActivated;
}
/**

View File

@ -9,6 +9,7 @@ import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings;
import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannel;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.IResourceModifiedConsumer;
@ -81,7 +82,7 @@ public class SubscriptionMatcherInterceptor implements IResourceModifiedConsumer
return;
}
if (myMatchingChannel == null) {
myMatchingChannel = mySubscriptionChannelFactory.newMatchingSendingChannel(SubscriptionMatchingSubscriber.SUBSCRIPTION_MATCHING_CHANNEL_NAME, null);
myMatchingChannel = mySubscriptionChannelFactory.newMatchingSendingChannel(SubscriptionMatchingSubscriber.SUBSCRIPTION_MATCHING_CHANNEL_NAME, getChannelProducerSettings());
}
}
@ -166,6 +167,12 @@ public class SubscriptionMatcherInterceptor implements IResourceModifiedConsumer
myMatchingChannel.send(new ResourceModifiedJsonMessage(theMessage));
}
private ChannelProducerSettings getChannelProducerSettings() {
ChannelProducerSettings channelProducerSettings= new ChannelProducerSettings();
channelProducerSettings.setQualifyChannelName(myDaoConfig.isQualifySubscriptionMatchingChannelName());
return channelProducerSettings;
}
public void setFhirContext(FhirContext theCtx) {
myFhirContext = theCtx;
}

View File

@ -70,12 +70,12 @@ public class SubscriptionValidatingInterceptor {
@Hook(Pointcut.STORAGE_PRESTORAGE_RESOURCE_CREATED)
public void resourcePreCreate(IBaseResource theResource, RequestDetails theRequestDetails, RequestPartitionId theRequestPartitionId) {
validateSubmittedSubscription(theResource, theRequestDetails, theRequestPartitionId);
validateSubmittedSubscription(theResource, theRequestDetails, theRequestPartitionId, Pointcut.STORAGE_PRESTORAGE_RESOURCE_CREATED);
}
@Hook(Pointcut.STORAGE_PRESTORAGE_RESOURCE_UPDATED)
public void resourcePreCreate(IBaseResource theOldResource, IBaseResource theResource, RequestDetails theRequestDetails, RequestPartitionId theRequestPartitionId) {
validateSubmittedSubscription(theResource, theRequestDetails, theRequestPartitionId);
public void resourceUpdated(IBaseResource theOldResource, IBaseResource theResource, RequestDetails theRequestDetails, RequestPartitionId theRequestPartitionId) {
validateSubmittedSubscription(theResource, theRequestDetails, theRequestPartitionId, Pointcut.STORAGE_PRESTORAGE_RESOURCE_UPDATED);
}
@Autowired
@ -83,14 +83,30 @@ public class SubscriptionValidatingInterceptor {
myFhirContext = theFhirContext;
}
// This will be deleted once the next snapshot (6.3.15) is published
@Deprecated
public void validateSubmittedSubscription(IBaseResource theSubscription) {
validateSubmittedSubscription(theSubscription, null, null);
validateSubmittedSubscription(theSubscription, null, null, Pointcut.STORAGE_PRESTORAGE_RESOURCE_CREATED);
}
// This will be deleted once the next snapshot (6.3.15) is published
@Deprecated(since="6.3.14")
public void validateSubmittedSubscription(IBaseResource theSubscription,
RequestDetails theRequestDetails,
RequestPartitionId theRequestPartitionId) {
validateSubmittedSubscription(theSubscription, theRequestDetails, theRequestPartitionId, Pointcut.STORAGE_PRESTORAGE_RESOURCE_CREATED);
}
@VisibleForTesting
void validateSubmittedSubscription(IBaseResource theSubscription,
RequestDetails theRequestDetails,
RequestPartitionId theRequestPartitionId,
Pointcut thePointcut) {
if (Pointcut.STORAGE_PRESTORAGE_RESOURCE_CREATED != thePointcut && Pointcut.STORAGE_PRESTORAGE_RESOURCE_UPDATED != thePointcut) {
throw new UnprocessableEntityException(Msg.code(2267) + "Expected Pointcut to be either STORAGE_PRESTORAGE_RESOURCE_CREATED or STORAGE_PRESTORAGE_RESOURCE_UPDATED but was: " + thePointcut);
}
if (!"Subscription".equals(myFhirContext.getResourceType(theSubscription))) {
return;
}
@ -117,7 +133,7 @@ public class SubscriptionValidatingInterceptor {
break;
}
validatePermissions(theSubscription, subscription, theRequestDetails, theRequestPartitionId);
validatePermissions(theSubscription, subscription, theRequestDetails, theRequestPartitionId, thePointcut);
mySubscriptionCanonicalizer.setMatchingStrategyTag(theSubscription, null);
@ -151,13 +167,18 @@ public class SubscriptionValidatingInterceptor {
protected void validatePermissions(IBaseResource theSubscription,
CanonicalSubscription theCanonicalSubscription,
RequestDetails theRequestDetails,
RequestPartitionId theRequestPartitionId) {
RequestPartitionId theRequestPartitionId,
Pointcut thePointcut) {
// If the subscription has the cross partition tag
if (SubscriptionUtil.isCrossPartition(theSubscription) && !(theRequestDetails instanceof SystemRequestDetails)) {
if (!myDaoConfig.isCrossPartitionSubscriptionEnabled()){
throw new UnprocessableEntityException(Msg.code(2009) + "Cross partition subscription is not enabled on this server");
}
if (theRequestPartitionId == null && Pointcut.STORAGE_PRESTORAGE_RESOURCE_UPDATED == thePointcut) {
return;
}
// if we have a partition id already, we'll use that
// otherwise we might end up with READ and CREATE pointcuts
// returning conflicting partitions (say, all vs default)

View File

@ -134,6 +134,9 @@ public class SubscriptionLoaderTest {
when(mySubscriptionActivatingInterceptor.activateSubscriptionIfRequired(any(IBaseResource.class)))
.thenReturn(false);
when(mySubscriptionActivatingInterceptor.isChannelTypeSupported(any(IBaseResource.class)))
.thenReturn(true);
when(mySubscriptionCanonicalizer.getSubscriptionStatus(any())).thenReturn(SubscriptionConstants.REQUESTED_STATUS);
// test

View File

@ -0,0 +1,60 @@
package ca.uhn.fhir.jpa.subscription.submit.interceptor;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.Set;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hl7.fhir.dstu2.model.Subscription.SubscriptionChannelType.RESTHOOK;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class SubscriptionMatcherInterceptorTest {
@Mock
DaoConfig myDaoConfig;
@Mock
SubscriptionChannelFactory mySubscriptionChannelFactory;
@InjectMocks
SubscriptionMatcherInterceptor myUnitUnderTest;
@Captor
ArgumentCaptor<ChannelProducerSettings> myArgumentCaptor;
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testMethodStartIfNeeded_withQualifySubscriptionMatchingChannelNameProperty_mayQualifyChannelName(boolean theIsQualifySubMatchingChannelName){
// given
boolean expectedResult = theIsQualifySubMatchingChannelName;
when(myDaoConfig.isQualifySubscriptionMatchingChannelName()).thenReturn(theIsQualifySubMatchingChannelName);
when(myDaoConfig.getSupportedSubscriptionTypes()).thenReturn(Set.of(RESTHOOK));
// when
myUnitUnderTest.startIfNeeded();
// then
ChannelProducerSettings capturedChannelProducerSettings = getCapturedChannelProducerSettings();
assertThat(capturedChannelProducerSettings.isQualifyChannelName(), is(expectedResult));
}
private ChannelProducerSettings getCapturedChannelProducerSettings(){
verify(mySubscriptionChannelFactory).newMatchingSendingChannel(anyString(), myArgumentCaptor.capture());
return myArgumentCaptor.getValue();
}
}

View File

@ -1,12 +1,14 @@
package ca.uhn.fhir.jpa.subscription.submit.interceptor;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionStrategyEvaluator;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
import org.hl7.fhir.r4.model.Subscription;
import org.junit.jupiter.api.BeforeEach;
@ -20,6 +22,8 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import javax.annotation.Nonnull;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.fail;
@ -50,7 +54,7 @@ public class SubscriptionValidatingInterceptorTest {
public void testEmptySub() {
try {
Subscription badSub = new Subscription();
mySubscriptionValidatingInterceptor.validateSubmittedSubscription(badSub);
mySubscriptionValidatingInterceptor.resourcePreCreate(badSub, null, null);
fail();
} catch (UnprocessableEntityException e) {
assertThat(e.getMessage(), is(Msg.code(8) + "Can not process submitted Subscription - Subscription.status must be populated on this server"));
@ -63,7 +67,7 @@ public class SubscriptionValidatingInterceptorTest {
try {
Subscription badSub = new Subscription();
badSub.setStatus(Subscription.SubscriptionStatus.ACTIVE);
mySubscriptionValidatingInterceptor.validateSubmittedSubscription(badSub);
mySubscriptionValidatingInterceptor.resourcePreCreate(badSub, null, null);
fail();
} catch (UnprocessableEntityException e) {
assertThat(e.getMessage(), is(Msg.code(11) + "Subscription.criteria must be populated"));
@ -76,7 +80,7 @@ public class SubscriptionValidatingInterceptorTest {
Subscription badSub = new Subscription();
badSub.setStatus(Subscription.SubscriptionStatus.ACTIVE);
badSub.setCriteria("Patient");
mySubscriptionValidatingInterceptor.validateSubmittedSubscription(badSub);
mySubscriptionValidatingInterceptor.resourcePreCreate(badSub, null, null);
fail();
} catch (UnprocessableEntityException e) {
assertThat(e.getMessage(), is(Msg.code(14) + "Subscription.criteria must be in the form \"{Resource Type}?[params]\""));
@ -89,7 +93,7 @@ public class SubscriptionValidatingInterceptorTest {
Subscription badSub = new Subscription();
badSub.setStatus(Subscription.SubscriptionStatus.ACTIVE);
badSub.setCriteria("Patient?");
mySubscriptionValidatingInterceptor.validateSubmittedSubscription(badSub);
mySubscriptionValidatingInterceptor.resourcePreCreate(badSub, null, null);
fail();
} catch (UnprocessableEntityException e) {
assertThat(e.getMessage(), is(Msg.code(20) + "Subscription.channel.type must be populated"));
@ -104,7 +108,7 @@ public class SubscriptionValidatingInterceptorTest {
badSub.setCriteria("Patient?");
Subscription.SubscriptionChannelComponent channel = badSub.getChannel();
channel.setType(Subscription.SubscriptionChannelType.MESSAGE);
mySubscriptionValidatingInterceptor.validateSubmittedSubscription(badSub);
mySubscriptionValidatingInterceptor.resourcePreCreate(badSub, null, null);
fail();
} catch (UnprocessableEntityException e) {
assertThat(e.getMessage(), is(Msg.code(16) + "No endpoint defined for message subscription"));
@ -121,7 +125,7 @@ public class SubscriptionValidatingInterceptorTest {
channel.setEndpoint("foo");
try {
mySubscriptionValidatingInterceptor.validateSubmittedSubscription(badSub);
mySubscriptionValidatingInterceptor.resourcePreCreate(badSub, null, null);
fail();
} catch (UnprocessableEntityException e) {
assertThat(e.getMessage(), is(Msg.code(17) + "Only 'channel' protocol is supported for Subscriptions with channel type 'message'"));
@ -129,7 +133,7 @@ public class SubscriptionValidatingInterceptorTest {
channel.setEndpoint("channel");
try {
mySubscriptionValidatingInterceptor.validateSubmittedSubscription(badSub);
mySubscriptionValidatingInterceptor.resourcePreCreate(badSub, null, null);
fail();
} catch (UnprocessableEntityException e) {
assertThat(e.getMessage(), is(Msg.code(17) + "Only 'channel' protocol is supported for Subscriptions with channel type 'message'"));
@ -137,7 +141,7 @@ public class SubscriptionValidatingInterceptorTest {
channel.setEndpoint("channel:");
try {
mySubscriptionValidatingInterceptor.validateSubmittedSubscription(badSub);
mySubscriptionValidatingInterceptor.resourcePreCreate(badSub, null, null);
fail();
} catch (UnprocessableEntityException e) {
assertThat(e.getMessage(), is(Msg.code(19) + "Invalid subscription endpoint uri channel:"));
@ -145,7 +149,36 @@ public class SubscriptionValidatingInterceptorTest {
// Happy path
channel.setEndpoint("channel:my-queue-name");
mySubscriptionValidatingInterceptor.validateSubmittedSubscription(badSub);
mySubscriptionValidatingInterceptor.resourcePreCreate(badSub, null, null);
}
@Test
public void testSubscriptionUpdate() {
final Subscription subscription = createSubscription();
// Assert there is no Exception thrown here.
mySubscriptionValidatingInterceptor.resourceUpdated(subscription, subscription, null, null);
}
@Test
public void testInvalidPointcut() {
try {
mySubscriptionValidatingInterceptor.validateSubmittedSubscription(createSubscription(), null, null, Pointcut.TEST_RB);
fail();
} catch (UnprocessableEntityException e) {
assertThat(e.getMessage(), is(Msg.code(2267) + "Expected Pointcut to be either STORAGE_PRESTORAGE_RESOURCE_CREATED or STORAGE_PRESTORAGE_RESOURCE_UPDATED but was: " + Pointcut.TEST_RB));
}
}
@Nonnull
private static Subscription createSubscription() {
final Subscription subscription = new Subscription();
subscription.setStatus(Subscription.SubscriptionStatus.REQUESTED);
subscription.setCriteria("Patient?");
final Subscription.SubscriptionChannelComponent channel = subscription.getChannel();
channel.setType(Subscription.SubscriptionChannelType.RESTHOOK);
channel.setEndpoint("channel");
return subscription;
}
@Configuration

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.5.0-SNAPSHOT</version>
<version>6.5.1-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.5.0-SNAPSHOT</version>
<version>6.5.1-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -95,6 +95,7 @@ public abstract class BaseResourceProviderDstu3Test extends BaseJpaDstu3Test {
myPort = myServer.getPort();
myServerBase = myServer.getBaseUrl();
myClient = myServer.getFhirClient();
myClient.setEncoding(EncodingEnum.JSON);
myRestServer = myServer.getRestfulServer();
myClient.getInterceptorService().unregisterInterceptorsIf(t -> t instanceof LoggingInterceptor);

View File

@ -430,7 +430,7 @@ public class ResourceProviderDstu3Test extends BaseResourceProviderDstu3Test {
String respString = myClient.transaction().withBundle(input).prettyPrint().execute();
ourLog.debug(respString);
Bundle bundle = myFhirContext.newXmlParser().parseResource(Bundle.class, respString);
Bundle bundle = myFhirContext.newJsonParser().parseResource(Bundle.class, respString);
IdType id = new IdType(bundle.getEntry().get(0).getResponse().getLocation());
Basic basic = myClient.read().resource(Basic.class).withId(id).execute();
@ -1098,7 +1098,7 @@ public class ResourceProviderDstu3Test extends BaseResourceProviderDstu3Test {
//@formatter:on
fail();
} catch (PreconditionFailedException e) {
assertEquals("HTTP 412 Precondition Failed: " + Msg.code(962) + "Failed to DELETE resource with match URL \"Patient?identifier=testDeleteConditionalMultiple\" because this search matched 2 resources",
assertEquals("HTTP 412 Precondition Failed: " + Msg.code(962) + "Failed to DELETE resource with match URL \"Patient?identifier=testDeleteConditionalMultiple&_format=json\" because this search matched 2 resources",
e.getMessage());
}

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.5.0-SNAPSHOT</version>
<version>6.5.1-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -34,6 +34,8 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -45,11 +47,13 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static ca.uhn.fhir.batch2.config.BaseBatch2Config.CHANNEL_NAME;
import static ca.uhn.fhir.batch2.coordinator.WorkChunkProcessor.MAX_CHUNK_ERROR_COUNT;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@ -216,9 +220,44 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
myLastStepLatch.awaitExpected();
}
@Test
public void testJobDefinitionWithReductionStepIT() throws InterruptedException {
private void createThreeStepReductionJob(
String theJobId,
IJobStepWorker<TestJobParameters, VoidModel, FirstStepOutput> theFirstStep,
IJobStepWorker<TestJobParameters, FirstStepOutput, SecondStepOutput> theSecondStep,
IReductionStepWorker<TestJobParameters, SecondStepOutput, ReductionStepOutput> theReductionsStep
) {
// create job definition (it's the test method's name)
JobDefinition<? extends IModelJson> jd = JobDefinition.newBuilder()
.setJobDefinitionId(theJobId)
.setJobDescription("test job")
.setJobDefinitionVersion(TEST_JOB_VERSION)
.setParametersType(TestJobParameters.class)
.gatedExecution()
.addFirstStep(
FIRST_STEP_ID,
"Test first step",
FirstStepOutput.class,
theFirstStep
)
.addIntermediateStep("SECOND",
"Second step",
SecondStepOutput.class,
theSecondStep)
.addFinalReducerStep(
LAST_STEP_ID,
"Test last step",
ReductionStepOutput.class,
theReductionsStep
)
.build();
myJobDefinitionRegistry.addJobDefinition(jd);
}
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testJobDefinitionWithReductionStepIT(boolean theDelayReductionStepBool) throws InterruptedException {
// setup
String jobId = new Exception().getStackTrace()[0].getMethodName() + "_" + theDelayReductionStepBool;
String testInfo = "test";
AtomicInteger secondStepInt = new AtomicInteger();
@ -235,6 +274,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
SecondStepOutput output = new SecondStepOutput();
output.setValue(testInfo + secondStepInt.getAndIncrement());
sink.accept(output);
return RunOutcome.SUCCESS;
};
@ -243,63 +283,66 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
private final ArrayList<SecondStepOutput> myOutput = new ArrayList<>();
private final AtomicBoolean myBoolean = new AtomicBoolean();
private final AtomicInteger mySecondGate = new AtomicInteger();
@Override
public ChunkOutcome consume(ChunkExecutionDetails<TestJobParameters, SecondStepOutput> theChunkDetails) {
myOutput.add(theChunkDetails.getData());
// 1 because we know 2 packets are coming.
// we'll fire the second maintenance run on the second packet
// which should cause multiple maintenance runs to run simultaneously
if (theDelayReductionStepBool && mySecondGate.getAndIncrement() == 1) {
ourLog.info("SECOND FORCED MAINTENANCE PASS FORCED");
myBatch2JobHelper.forceRunMaintenancePass();
}
return ChunkOutcome.SUCCESS();
}
@Nonnull
@Override
public RunOutcome run(@Nonnull StepExecutionDetails<TestJobParameters, SecondStepOutput> theStepExecutionDetails,
@Nonnull IJobDataSink<ReductionStepOutput> theDataSink) throws JobExecutionFailedException {
theDataSink.accept(new ReductionStepOutput(myOutput));
callLatch(myLastStepLatch, theStepExecutionDetails);
public RunOutcome run(
@Nonnull StepExecutionDetails<TestJobParameters, SecondStepOutput> theStepExecutionDetails,
@Nonnull IJobDataSink<ReductionStepOutput> theDataSink
) throws JobExecutionFailedException {
boolean isRunAlready = myBoolean.getAndSet(true);
assertFalse(isRunAlready, "Reduction step should only be called once!");
complete(theStepExecutionDetails, theDataSink);
return RunOutcome.SUCCESS;
}
};
// create job definition
String jobId = new Exception().getStackTrace()[0].getMethodName();
JobDefinition<? extends IModelJson> jd = JobDefinition.newBuilder()
.setJobDefinitionId(jobId)
.setJobDescription("test job")
.setJobDefinitionVersion(TEST_JOB_VERSION)
.setParametersType(TestJobParameters.class)
.gatedExecution()
.addFirstStep(
FIRST_STEP_ID,
"Test first step",
FirstStepOutput.class,
first
)
.addIntermediateStep("SECOND",
"Second step",
SecondStepOutput.class,
second)
.addFinalReducerStep(
LAST_STEP_ID,
"Test last step",
ReductionStepOutput.class,
last
)
.build();
myJobDefinitionRegistry.addJobDefinition(jd);
private void complete(
@Nonnull StepExecutionDetails<TestJobParameters, SecondStepOutput> theStepExecutionDetails,
@Nonnull IJobDataSink<ReductionStepOutput> theDataSink
) {
assertTrue(myBoolean.get());
theDataSink.accept(new ReductionStepOutput(myOutput));
callLatch(myLastStepLatch, theStepExecutionDetails);
}
};
createThreeStepReductionJob(jobId, first, second, last);
// run test
JobInstanceStartRequest request = buildRequest(jobId);
myFirstStepLatch.setExpectedCount(1);
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(request);
String instanceId = startResponse.getJobId();
myFirstStepLatch.awaitExpected();
assertNotNull(instanceId);
myBatch2JobHelper.awaitGatedStepId(FIRST_STEP_ID, instanceId);
// wait for last step to finish
ourLog.info("Setting last step latch");
myLastStepLatch.setExpectedCount(1);
// waiting
myBatch2JobHelper.awaitJobCompletion(instanceId);
myLastStepLatch.awaitExpected();
ourLog.info("awaited the last step");
// verify
Optional<JobInstance> instanceOp = myJobPersistence.fetchInstance(instanceId);

View File

@ -21,8 +21,11 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import javax.annotation.Nonnull;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
@ -133,6 +136,86 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
});
}
@Test
public void testFetchInstanceWithStatusAndCutoff_statues() {
myCaptureQueriesListener.clear();
final String completedId = storeJobInstanceAndUpdateWithEndTime(StatusEnum.COMPLETED, 1);
final String failedId = storeJobInstanceAndUpdateWithEndTime(StatusEnum.FAILED, 1);
final String erroredId = storeJobInstanceAndUpdateWithEndTime(StatusEnum.ERRORED, 1);
final String cancelledId = storeJobInstanceAndUpdateWithEndTime(StatusEnum.CANCELLED, 1);
storeJobInstanceAndUpdateWithEndTime(StatusEnum.QUEUED, 1);
storeJobInstanceAndUpdateWithEndTime(StatusEnum.IN_PROGRESS, 1);
storeJobInstanceAndUpdateWithEndTime(StatusEnum.FINALIZE, 1);
final LocalDateTime cutoffLocalDateTime = LocalDateTime.now()
.minusMinutes(0);
final Date cutoffDate = Date.from(cutoffLocalDateTime
.atZone(ZoneId.systemDefault())
.toInstant());
final List<JobInstance> jobInstancesByCutoff =
mySvc.fetchInstances(JOB_DEFINITION_ID, StatusEnum.getEndedStatuses(), cutoffDate, PageRequest.of(0, 100));
assertEquals(Set.of(completedId, failedId, erroredId, cancelledId),
jobInstancesByCutoff.stream()
.map(JobInstance::getInstanceId)
.collect(Collectors.toUnmodifiableSet()));
}
@Test
public void testFetchInstanceWithStatusAndCutoff_cutoffs() {
myCaptureQueriesListener.clear();
storeJobInstanceAndUpdateWithEndTime(StatusEnum.COMPLETED, 3);
storeJobInstanceAndUpdateWithEndTime(StatusEnum.COMPLETED, 4);
final String sevenMinutesAgoId = storeJobInstanceAndUpdateWithEndTime(StatusEnum.COMPLETED, 7);
final String eightMinutesAgoId = storeJobInstanceAndUpdateWithEndTime(StatusEnum.COMPLETED, 8);
final LocalDateTime cutoffLocalDateTime = LocalDateTime.now()
.minusMinutes(6);
final Date cutoffDate = Date.from(cutoffLocalDateTime
.atZone(ZoneId.systemDefault())
.toInstant());
final List<JobInstance> jobInstancesByCutoff =
mySvc.fetchInstances(JOB_DEFINITION_ID, StatusEnum.getEndedStatuses(), cutoffDate, PageRequest.of(0, 100));
myCaptureQueriesListener.logSelectQueries();
myCaptureQueriesListener.getSelectQueries().forEach(query -> ourLog.info("query: {}", query.getSql(true, true)));
assertEquals(Set.of(sevenMinutesAgoId, eightMinutesAgoId),
jobInstancesByCutoff.stream()
.map(JobInstance::getInstanceId)
.collect(Collectors.toUnmodifiableSet()));
}
@Test
public void testFetchInstanceWithStatusAndCutoff_pages() {
final String job1 = storeJobInstanceAndUpdateWithEndTime(StatusEnum.COMPLETED, 5);
final String job2 = storeJobInstanceAndUpdateWithEndTime(StatusEnum.COMPLETED, 5);
storeJobInstanceAndUpdateWithEndTime(StatusEnum.COMPLETED, 5);
storeJobInstanceAndUpdateWithEndTime(StatusEnum.COMPLETED, 5);
storeJobInstanceAndUpdateWithEndTime(StatusEnum.COMPLETED, 5);
storeJobInstanceAndUpdateWithEndTime(StatusEnum.COMPLETED, 5);
final LocalDateTime cutoffLocalDateTime = LocalDateTime.now()
.minusMinutes(0);
final Date cutoffDate = Date.from(cutoffLocalDateTime
.atZone(ZoneId.systemDefault())
.toInstant());
final List<JobInstance> jobInstancesByCutoff =
mySvc.fetchInstances(JOB_DEFINITION_ID, StatusEnum.getEndedStatuses(), cutoffDate, PageRequest.of(0, 2));
assertEquals(Set.of(job1, job2),
jobInstancesByCutoff.stream()
.map(JobInstance::getInstanceId)
.collect(Collectors.toUnmodifiableSet()));
}
/**
* Returns a set of statuses, and whether they should be successfully picked up and started by a consumer.
* @return
@ -548,4 +631,29 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
return instance;
}
@Nonnull
private String storeJobInstanceAndUpdateWithEndTime(StatusEnum theStatus, int minutes) {
final JobInstance jobInstance = new JobInstance();
jobInstance.setJobDefinitionId(JOB_DEFINITION_ID);
jobInstance.setStatus(theStatus);
jobInstance.setJobDefinitionVersion(JOB_DEF_VER);
jobInstance.setParameters(CHUNK_DATA);
jobInstance.setReport("TEST");
final String id = mySvc.storeNewInstance(jobInstance);
jobInstance.setInstanceId(id);
final LocalDateTime localDateTime = LocalDateTime.now()
.minusMinutes(minutes);
ourLog.info("localDateTime: {}", localDateTime);
jobInstance.setEndTime(Date.from(localDateTime
.atZone(ZoneId.systemDefault())
.toInstant()));
mySvc.updateInstance(jobInstance);
return id;
}
}

View File

@ -49,6 +49,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
@ -667,9 +668,11 @@ public class BulkDataExportTest extends BaseResourceProviderR4Test {
assertNotNull(startResponse);
// Run a scheduled pass to build the export
myBatch2JobHelper.awaitJobCompletion(startResponse.getJobId());
myBatch2JobHelper.awaitJobCompletion(startResponse.getJobId(), 60);
await().until(() -> myJobRunner.getJobInfo(startResponse.getJobId()).getReport() != null);
await()
.atMost(120, TimeUnit.SECONDS)
.until(() -> myJobRunner.getJobInfo(startResponse.getJobId()).getReport() != null);
// Iterate over the files
String report = myJobRunner.getJobInfo(startResponse.getJobId()).getReport();

View File

@ -6,11 +6,15 @@ import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.model.BulkExportJobResults;
import ca.uhn.fhir.jpa.api.svc.IBatch2JobRunner;
import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse;
import ca.uhn.fhir.jpa.bulk.export.model.BulkExportJobStatusEnum;
import ca.uhn.fhir.jpa.bulk.export.model.BulkExportResponseJson;
import ca.uhn.fhir.jpa.provider.BaseResourceProviderR4Test;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.util.BulkExportUtils;
import ca.uhn.fhir.parser.IParser;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.api.server.bulk.BulkDataExportOptions;
import ca.uhn.fhir.util.JsonUtil;
import ca.uhn.fhir.util.SearchParameterUtil;
@ -43,7 +47,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -51,9 +55,10 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
@ -61,12 +66,12 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.not;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class BulkExportUseCaseTest extends BaseResourceProviderR4Test {
@ -164,7 +169,9 @@ public class BulkExportUseCaseTest extends BaseResourceProviderR4Test {
HttpGet statusGet = new HttpGet(pollingLocation);
String expectedOriginalUrl = myClient.getServerBase() + "/$export";
try (CloseableHttpResponse status = ourHttpClient.execute(statusGet)) {
assertEquals(200, status.getStatusLine().getStatusCode());
String responseContent = IOUtils.toString(status.getEntity().getContent(), StandardCharsets.UTF_8);
assertTrue(isNotBlank(responseContent), responseContent);
ourLog.info(responseContent);
@ -403,6 +410,7 @@ public class BulkExportUseCaseTest extends BaseResourceProviderR4Test {
@AfterEach
public void after() {
myDaoConfig.setIndexMissingFields(DaoConfig.IndexEnabledEnum.DISABLED);
myDaoConfig.setBulkExportFileMaximumCapacity(DaoConfig.DEFAULT_BULK_EXPORT_FILE_MAXIMUM_CAPACITY);
}
@Test
@ -430,6 +438,57 @@ public class BulkExportUseCaseTest extends BaseResourceProviderR4Test {
assertThat(typeToContents.get("Observation"), containsString("obs-included"));
assertThat(typeToContents.get("Observation"), not(containsString("obs-excluded")));
}
@Test
public void testBulkExportWithLowMaxFileCapacity() {
final int numPatients = 250;
myDaoConfig.setBulkExportFileMaximumCapacity(1);
myDaoConfig.setIndexMissingFields(DaoConfig.IndexEnabledEnum.ENABLED);
RequestDetails details = new SystemRequestDetails();
List<String> patientIds = new ArrayList<>();
for(int i = 0; i < numPatients; i++){
String id = "p-"+i;
Patient patient = new Patient();
patient.setId(id);
myPatientDao.update(patient, details);
patientIds.add(id);
}
int patientsCreated = myPatientDao.search(SearchParameterMap.newSynchronous(), details).size();
assertEquals(numPatients, patientsCreated);
BulkDataExportOptions options = new BulkDataExportOptions();
options.setResourceTypes(Sets.newHashSet("Patient"));
options.setExportStyle(BulkDataExportOptions.ExportStyle.PATIENT);
options.setOutputFormat(Constants.CT_FHIR_NDJSON);
Batch2JobStartResponse job = myJobRunner.startNewJob(BulkExportUtils.createBulkExportJobParametersFromExportOptions(options));
myBatch2JobHelper.awaitJobCompletion(job.getJobId(), 60);
ourLog.debug("Job status after awaiting - {}", myJobRunner.getJobInfo(job.getJobId()).getStatus());
await()
.atMost(300, TimeUnit.SECONDS)
.until(() -> {
BulkExportJobStatusEnum status = myJobRunner.getJobInfo(job.getJobId()).getStatus();
if (!BulkExportJobStatusEnum.COMPLETE.equals(status)) {
fail("Job status was changed from COMPLETE to " + status);
}
return myJobRunner.getJobInfo(job.getJobId()).getReport() != null;
});
String report = myJobRunner.getJobInfo(job.getJobId()).getReport();
BulkExportJobResults results = JsonUtil.deserialize(report, BulkExportJobResults.class);
List<String> binaryUrls = results.getResourceTypeToBinaryIds().get("Patient");
IParser jsonParser = myFhirContext.newJsonParser();
for(String url : binaryUrls){
Binary binary = myClient.read().resource(Binary.class).withUrl(url).execute();
assertEquals(Constants.CT_FHIR_NDJSON, binary.getContentType());
String resourceContents = new String(binary.getContent(), Constants.CHARSET_UTF8);
String resourceId = jsonParser.parseResource(resourceContents).getIdElement().getIdPart();
assertTrue(patientIds.contains(resourceId));
}
}
}

View File

@ -2895,9 +2895,28 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test
}
private void printQueryCount(String theMessage){
@Test
public void testDeleteResource_WithMassIngestionMode_enabled(){
myDaoConfig.setMassIngestionMode(true);
// given
Observation observation = new Observation()
.setStatus(Observation.ObservationStatus.FINAL)
.addCategory(new CodeableConcept().addCoding(new Coding("http://category-type", "12345", null)))
.setCode(new CodeableConcept().addCoding(new Coding("http://coverage-type", "12345", null)));
IIdType idDt = myObservationDao.create(observation, mySrd).getEntity().getIdDt();
// when
myCaptureQueriesListener.clear();
myObservationDao.delete(idDt, mySrd);
// then
assertQueryCount(3,1,1, 1);
}
private void printQueryCount(){
ourLog.info("QueryCount {} is: ", theMessage);
ourLog.info("\tselect: {}", myCaptureQueriesListener.getSelectQueriesForCurrentThread().size());
ourLog.info("\tupdate: {}", myCaptureQueriesListener.getUpdateQueriesForCurrentThread().size());
ourLog.info("\tinsert: {}", myCaptureQueriesListener.getInsertQueriesForCurrentThread().size());
@ -2905,16 +2924,15 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test
}
private void assertQueryCount(int theExpectedSelect, int theExpectedUpdate, int theExpectedInsert, int theExpectedDelete){
myCaptureQueriesListener.logSelectQueriesForCurrentThread();
private void assertQueryCount(int theExpectedSelectCount, int theExpectedUpdateCount, int theExpectedInsertCount, int theExpectedDeleteCount){
assertEquals(theExpectedSelect, myCaptureQueriesListener.getSelectQueriesForCurrentThread().size());
assertEquals(theExpectedSelectCount, myCaptureQueriesListener.getSelectQueriesForCurrentThread().size());
myCaptureQueriesListener.logUpdateQueriesForCurrentThread();
assertEquals(theExpectedUpdate, myCaptureQueriesListener.getUpdateQueriesForCurrentThread().size());
assertEquals(theExpectedUpdateCount, myCaptureQueriesListener.getUpdateQueriesForCurrentThread().size());
myCaptureQueriesListener.logInsertQueriesForCurrentThread();
assertEquals(theExpectedInsert, myCaptureQueriesListener.getInsertQueriesForCurrentThread().size());
assertEquals(theExpectedInsertCount, myCaptureQueriesListener.getInsertQueriesForCurrentThread().size());
myCaptureQueriesListener.logDeleteQueriesForCurrentThread();
assertEquals(theExpectedDelete, myCaptureQueriesListener.getDeleteQueriesForCurrentThread().size());
assertEquals(theExpectedDeleteCount, myCaptureQueriesListener.getDeleteQueriesForCurrentThread().size());
}
private Group createGroup(List<IIdType> theIIdTypeList) {

View File

@ -175,6 +175,7 @@ public class FhirResourceDaoR4Test extends BaseJpaR4Test {
myDaoConfig.setInternalSynchronousSearchSize(new DaoConfig().getInternalSynchronousSearchSize());
myModelConfig.setNormalizedQuantitySearchLevel(NormalizedQuantitySearchLevel.NORMALIZED_QUANTITY_SEARCH_NOT_SUPPORTED);
myDaoConfig.setHistoryCountMode(DaoConfig.DEFAULT_HISTORY_COUNT_MODE);
myDaoConfig.setMassIngestionMode(false);
}
private void assertGone(IIdType theId) {
@ -2498,6 +2499,35 @@ public class FhirResourceDaoR4Test extends BaseJpaR4Test {
}
@Test
public void testDeleteWithMassInjectionModeEnabled(){
myDaoConfig.setMassIngestionMode(true);
// given
Observation observation = new Observation()
.setStatus(ObservationStatus.FINAL)
.addCategory(newCodeableConcept("http://somesystem","somecode"))
.setCode(newCodeableConcept("http://loinc.org", "15074-8"));
// when
IIdType idDt = myObservationDao.create(observation, mySrd).getEntity().getIdDt();
myObservationDao.delete(idDt, mySrd);
// then
runInTransaction(() -> {
Long obsertionId = idDt.getIdPartAsLong();
Long resourceCurrentVersion = myResourceTableDao.findCurrentVersionByPid(obsertionId);
int resourceVersionCount = myResourceHistoryTableDao.findAllVersionsForResourceIdInOrder(obsertionId).size();
int indexedTokenCount = myResourceIndexedSearchParamTokenDao.countForResourceId(obsertionId);
assertThat(resourceCurrentVersion, equalTo(2L));
assertThat(resourceVersionCount, equalTo(2));
assertThat(indexedTokenCount, equalTo(0));
});
}
@Test
public void testPersistContactPoint() {
List<IAnyResource> found = toList(myPatientDao.search(new SearchParameterMap(Patient.SP_TELECOM, new TokenParam(null, "555-123-4567")).setLoadSynchronous(true)));

View File

@ -9,6 +9,7 @@ import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.interceptor.api.IAnonymousInterceptor;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse;
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
import ca.uhn.fhir.jpa.test.PatientReindexTestHelper;
@ -23,15 +24,19 @@ import org.junit.jupiter.params.provider.MethodSource;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.PostConstruct;
import java.util.Date;
import java.util.List;
import java.util.stream.Stream;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class ReindexJobTest extends BaseJpaR4Test {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(ReindexJobTest.class);
@Autowired
private IJobCoordinator myJobCoordinator;
@ -89,6 +94,41 @@ public class ReindexJobTest extends BaseJpaR4Test {
myDaoConfig.setMarkResourcesForReindexingUponSearchParameterChange(reindexPropertyCache);
}
@Test
public void testReindexDeletedResources_byUrl_willRemoveDeletedResourceEntriesFromIndexTables(){
IIdType obsId = myReindexTestHelper.createObservationWithAlleleExtension(Observation.ObservationStatus.FINAL);
runInTransaction(() -> {
int entriesInSpIndexTokenTable = myResourceIndexedSearchParamTokenDao.countForResourceId(obsId.getIdPartAsLong());
assertThat(entriesInSpIndexTokenTable, equalTo(1));
// simulate resource deletion
ResourceTable resource = myResourceTableDao.findById(obsId.getIdPartAsLong()).get();
Date currentDate = new Date();
resource.setDeleted(currentDate);
resource.setUpdated(currentDate);
resource.setHashSha256(null);
resource.setVersion(2L);
myResourceTableDao.save(resource);
});
// execute reindexing
ReindexJobParameters parameters = new ReindexJobParameters();
parameters.addUrl("Observation?status=final");
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);
startRequest.setParameters(parameters);
Batch2JobStartResponse res = myJobCoordinator.startInstance(startRequest);
myBatch2JobHelper.awaitJobCompletion(res);
// then
runInTransaction(() -> {
int entriesInSpIndexTokenTablePostReindexing = myResourceIndexedSearchParamTokenDao.countForResourceId(obsId.getIdPartAsLong());
assertThat(entriesInSpIndexTokenTablePostReindexing, equalTo(0));
});
}
@Test
public void testReindex_Everything() {
// setup

Some files were not shown because too many files have changed in this diff Show More